You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/03 08:39:25 UTC

[iotdb] branch master updated: [IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c6f9d0a085 [IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)
c6f9d0a085 is described below

commit c6f9d0a0854cdd83fa07decf2960245d972cbf46
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue May 3 16:39:21 2022 +0800

    [IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)
---
 .../iotdb/db/mpp/aggregation/Accumulator.java      |  70 +++++
 .../db/mpp/aggregation/AccumulatorFactory.java     |  71 +++++
 .../iotdb/db/mpp/aggregation/Aggregator.java       | 122 +++++++
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   | 180 +++++++++++
 .../iotdb/db/mpp/aggregation/CountAccumulator.java | 101 ++++++
 .../db/mpp/aggregation/ExtremeAccumulator.java     | 296 +++++++++++++++++
 .../db/mpp/aggregation/FirstValueAccumulator.java  | 319 +++++++++++++++++++
 .../mpp/aggregation/FirstValueDescAccumulator.java |  91 ++++++
 .../db/mpp/aggregation/LastValueAccumulator.java   | 305 ++++++++++++++++++
 .../mpp/aggregation/LastValueDescAccumulator.java  | 135 ++++++++
 .../db/mpp/aggregation/MaxTimeAccumulator.java     | 101 ++++++
 .../db/mpp/aggregation/MaxTimeDescAccumulator.java |  57 ++++
 .../db/mpp/aggregation/MaxValueAccumulator.java    | 270 ++++++++++++++++
 .../db/mpp/aggregation/MinTimeAccumulator.java     | 105 +++++++
 .../MinTimeDescAccumulator.java}                   |  40 +--
 .../db/mpp/aggregation/MinValueAccumulator.java    | 270 ++++++++++++++++
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   | 172 ++++++++++
 .../operator/process/AggregateOperator.java        |  17 +-
 .../operator/process/DeviceViewOperator.java       |   2 +-
 .../source/SeriesAggregateScanOperator.java        | 121 ++++---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   2 +-
 .../plan/parameter/AggregationDescriptor.java      |   4 +
 .../plan/planner/plan/parameter/InputLocation.java |   4 +-
 .../iotdb/db/mpp/aggregation/AccumulatorTest.java  | 349 +++++++++++++++++++++
 .../operator/SeriesAggregateScanOperatorTest.java  | 153 +++++++--
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  38 ++-
 .../read/common/block/column/BinaryColumn.java     |   9 +
 .../read/common/block/column/BooleanColumn.java    |   9 +
 .../tsfile/read/common/block/column/Column.java    |   6 +
 .../read/common/block/column/DoubleColumn.java     |   9 +
 .../read/common/block/column/FloatColumn.java      |   8 +
 .../tsfile/read/common/block/column/IntColumn.java |   8 +
 .../read/common/block/column/LongColumn.java       |   8 +
 .../block/column/RunLengthEncodedColumn.java       |   8 +
 .../read/common/block/column/TimeColumn.java       |   8 +
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 146 ++++++++-
 .../iotdb/tsfile/common/block/TsBlockTest.java     |  49 +++
 .../iotdb/tsfile/read/common/ColumnTest.java       | 322 +++++++++++++++++++
 38 files changed, 3850 insertions(+), 135 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
new file mode 100644
index 0000000000..40c8a08d50
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public interface Accumulator {
+
+  /** Column should be like: | Time | Value | */
+  void addInput(Column[] column, TimeRange timeRange);
+
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
+   * last_value, it should be double column with dictionary order.
+   */
+  void addIntermediate(Column[] partialResult);
+
+  /**
+   * This method can only be used in seriesAggregateScanOperator, it will use different statistics
+   * based on the type of Accumulator.
+   */
+  void addStatistics(Statistics statistics);
+
+  /**
+   * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not
+   * allowed again.
+   */
+  void setFinal(Column finalResult);
+
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single, so its output column
+   * is single too; But for AVG, last_value, it should be double column with dictionary order.
+   */
+  void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
+
+  /** Final result is single column for any aggregation function. */
+  void outputFinal(ColumnBuilder tsBlockBuilder);
+
+  void reset();
+
+  /**
+   * This method can only be used in seriesAggregateScanOperator. For first_value or last_value in
+   * decreasing order, we can get final result by the first record.
+   */
+  boolean hasFinalResult();
+
+  TSDataType[] getIntermediateType();
+
+  TSDataType getFinalType();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java
new file mode 100644
index 0000000000..ed4a416d9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatorFactory {
+
+  // TODO: Are we going to create different seriesScanOperator based on order by sequence?
+  public static Accumulator createAccumulator(
+      AggregationType aggregationType, TSDataType tsDataType, boolean ascending) {
+    switch (aggregationType) {
+      case COUNT:
+        return new CountAccumulator();
+      case AVG:
+        return new AvgAccumulator(tsDataType);
+      case SUM:
+        return new SumAccumulator(tsDataType);
+      case EXTREME:
+        return new ExtremeAccumulator(tsDataType);
+      case MAX_TIME:
+        return ascending ? new MaxTimeAccumulator() : new MaxTimeDescAccumulator();
+      case MIN_TIME:
+        return ascending ? new MinTimeAccumulator() : new MinTimeDescAccumulator();
+      case MAX_VALUE:
+        return new MaxValueAccumulator(tsDataType);
+      case MIN_VALUE:
+        return new MinValueAccumulator(tsDataType);
+      case LAST_VALUE:
+        return ascending
+            ? new LastValueAccumulator(tsDataType)
+            : new LastValueDescAccumulator(tsDataType);
+      case FIRST_VALUE:
+        return ascending
+            ? new FirstValueAccumulator(tsDataType)
+            : new FirstValueDescAccumulator(tsDataType);
+      default:
+        throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
+    }
+  }
+
+  public static List<Accumulator> createAccumulators(
+      List<AggregationType> aggregationTypes, TSDataType tsDataType, boolean ascending) {
+    List<Accumulator> accumulators = new ArrayList<>();
+    for (AggregationType aggregationType : aggregationTypes) {
+      accumulators.add(createAccumulator(aggregationType, tsDataType, ascending));
+    }
+    return accumulators;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
new file mode 100644
index 0000000000..a0a7b87f1b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class Aggregator {
+
+  private final Accumulator accumulator;
+  // In some intermediate result input, inputLocation[] should include two columns
+  private List<InputLocation[]> inputLocationList;
+  private final AggregationStep step;
+
+  private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
+
+  // Used for SeriesAggregateScanOperator
+  public Aggregator(Accumulator accumulator, AggregationStep step) {
+    this.accumulator = accumulator;
+    this.step = step;
+  }
+
+  // Used for aggregateOperator
+  public Aggregator(
+      Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
+    this.accumulator = accumulator;
+    this.step = step;
+    this.inputLocationList = inputLocationList;
+  }
+
+  // Used for SeriesAggregateScanOperator
+  public void processTsBlock(TsBlock tsBlock) {
+    checkArgument(
+        step.isInputRaw(), "Step in SeriesAggregateScanOperator can only process raw input");
+    // TODO Aligned TimeSeries
+    accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+  }
+
+  // Used for aggregateOperator
+  public void processTsBlocks(TsBlock[] tsBlock) {
+    for (InputLocation[] inputLocations : inputLocationList) {
+      if (step.isInputRaw()) {
+        TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
+        Column[] timeValueColumn = new Column[2];
+        timeValueColumn[0] = rawTsBlock.getTimeColumn();
+        timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeValueColumn, timeRange);
+      } else {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
+        }
+        accumulator.addIntermediate(columns);
+      }
+    }
+  }
+
+  public void outputResult(ColumnBuilder[] columnBuilder) {
+    if (step.isOutputPartial()) {
+      accumulator.outputIntermediate(columnBuilder);
+    } else {
+      accumulator.outputFinal(columnBuilder[0]);
+    }
+  }
+
+  public void processStatistics(Statistics statistics) {
+    accumulator.addStatistics(statistics);
+  }
+
+  public TSDataType[] getOutputType() {
+    if (step.isOutputPartial()) {
+      return accumulator.getIntermediateType();
+    } else {
+      return new TSDataType[] {accumulator.getFinalType()};
+    }
+  }
+
+  public void reset() {
+    accumulator.reset();
+  }
+
+  public boolean hasFinalResult() {
+    return accumulator.hasFinalResult();
+  }
+
+  public void setTimeRange(TimeRange timeRange) {
+    this.timeRange = timeRange;
+  }
+
+  public TimeRange getTimeRange() {
+    return timeRange;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
new file mode 100644
index 0000000000..3c726f9603
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class AvgAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private long countValue;
+  private double sumValue;
+
+  public AvgAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | countValue1 | sumValue1 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 2, "partialResult of Avg should be 2");
+    countValue += partialResult[0].getLong(0);
+    sumValue += partialResult[1].getDouble(0);
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
+  }
+
+  // Set sumValue to finalResult and keep countValue equals to 1
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    countValue = 1;
+    sumValue = finalResult.getDouble(0);
+  }
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 2, "partialResult of Avg should be 2");
+    columnBuilders[0].writeLong(countValue);
+    columnBuilders[1].writeDouble(sumValue);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeDouble(sumValue / countValue);
+  }
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+    this.sumValue = 0.0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void addIntInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        countValue++;
+        sumValue += column[1].getInt(i);
+      }
+    }
+  }
+
+  private void addLongInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        countValue++;
+        sumValue += column[1].getLong(i);
+      }
+    }
+  }
+
+  private void addFloatInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        countValue++;
+        sumValue += column[1].getFloat(i);
+      }
+    }
+  }
+
+  private void addDoubleInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        countValue++;
+        sumValue += column[1].getDouble(i);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
new file mode 100644
index 0000000000..c2825b00bb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CountAccumulator implements Accumulator {
+
+  private long countValue = 0;
+
+  public CountAccumulator() {}
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        countValue++;
+      }
+    }
+  }
+
+  // partialResult should be like: | partialCountValue1 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of Count should be 1");
+    countValue += partialResult[0].getLong(0);
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+  }
+
+  // finalResult should be single column, like: | finalCountValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    countValue = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in countAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of Count should be 1");
+    columnBuilders[0].writeLong(countValue);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(countValue);
+  }
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
new file mode 100644
index 0000000000..e70a7f484a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ExtremeAccumulator implements Accumulator {
+
+  private final TSDataType seriesDataType;
+  private TsPrimitiveType extremeResult;
+  private boolean initResult;
+
+  public ExtremeAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | PartialExtremeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of ExtremeValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult(partialResult[0].getInt(0));
+        break;
+      case INT64:
+        updateLongResult(partialResult[0].getLong(0));
+        break;
+      case FLOAT:
+        updateFloatResult(partialResult[0].getFloat(0));
+        break;
+      case DOUBLE:
+        updateDoubleResult(partialResult[0].getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult((int) statistics.getMaxValue());
+        updateIntResult((int) statistics.getMinValue());
+        break;
+      case INT64:
+        updateLongResult((long) statistics.getMaxValue());
+        updateLongResult((long) statistics.getMinValue());
+        break;
+      case FLOAT:
+        updateFloatResult((float) statistics.getMaxValue());
+        updateFloatResult((float) statistics.getMinValue());
+        break;
+      case DOUBLE:
+        updateDoubleResult((double) statistics.getMaxValue());
+        updateDoubleResult((double) statistics.getMinValue());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void setFinal(Column finalResult) {
+    extremeResult.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be single in ExtremeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of ExtremeValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilders[0].writeInt(extremeResult.getInt());
+        break;
+      case INT64:
+        columnBuilders[0].writeLong(extremeResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilders[0].writeFloat(extremeResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilders[0].writeDouble(extremeResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(extremeResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(extremeResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(extremeResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(extremeResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    extremeResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {extremeResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return extremeResult.getDataType();
+  }
+
+  private void addIntInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
+      }
+    }
+  }
+
+  private void updateIntResult(int extVal) {
+    int absExtVal = Math.abs(extVal);
+    int candidateResult = extremeResult.getInt();
+    int absCandidateResult = Math.abs(extremeResult.getInt());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+      initResult = true;
+      extremeResult.setInt(extVal);
+    }
+  }
+
+  private void addLongInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
+      }
+    }
+  }
+
+  private void updateLongResult(long extVal) {
+    long absExtVal = Math.abs(extVal);
+    long candidateResult = extremeResult.getLong();
+    long absCandidateResult = Math.abs(extremeResult.getLong());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+      initResult = true;
+      extremeResult.setLong(extVal);
+    }
+  }
+
+  private void addFloatInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
+      }
+    }
+  }
+
+  private void updateFloatResult(float extVal) {
+    float absExtVal = Math.abs(extVal);
+    float candidateResult = extremeResult.getFloat();
+    float absCandidateResult = Math.abs(extremeResult.getFloat());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+      initResult = true;
+      extremeResult.setFloat(extVal);
+    }
+  }
+
+  private void addDoubleInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
+      }
+    }
+  }
+
+  private void updateDoubleResult(double extVal) {
+    double absExtVal = Math.abs(extVal);
+    double candidateResult = extremeResult.getDouble();
+    double absCandidateResult = Math.abs(extremeResult.getDouble());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+      initResult = true;
+      extremeResult.setDouble(extVal);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
new file mode 100644
index 0000000000..45bce52036
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class FirstValueAccumulator implements Accumulator {
+
+  protected final TSDataType seriesDataType;
+  protected boolean hasCandidateResult;
+  protected TsPrimitiveType firstValue;
+  protected long minTime = Long.MAX_VALUE;
+
+  public FirstValueAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    firstValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+        addBinaryInput(column, timeRange);
+        break;
+      case BOOLEAN:
+        addBooleanInput(column, timeRange);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | FirstValue | MinTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 2, "partialResult of FirstValue should be 2");
+    switch (seriesDataType) {
+      case INT32:
+        updateIntFirstValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
+        break;
+      case INT64:
+        updateLongFirstValue(partialResult[0].getLong(0), partialResult[1].getLong(0));
+        break;
+      case FLOAT:
+        updateFloatFirstValue(partialResult[0].getFloat(0), partialResult[1].getLong(0));
+        break;
+      case DOUBLE:
+        updateDoubleFirstValue(partialResult[0].getDouble(0), partialResult[1].getLong(0));
+        break;
+      case TEXT:
+        updateBinaryFirstValue(partialResult[0].getBinary(0), partialResult[1].getLong(0));
+        break;
+      case BOOLEAN:
+        updateBooleanFirstValue(partialResult[0].getBoolean(0), partialResult[1].getLong(0));
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    switch (seriesDataType) {
+      case INT32:
+        updateIntFirstValue((int) statistics.getFirstValue(), statistics.getStartTime());
+        break;
+      case INT64:
+        updateLongFirstValue((long) statistics.getFirstValue(), statistics.getStartTime());
+        break;
+      case FLOAT:
+        updateFloatFirstValue((float) statistics.getFirstValue(), statistics.getStartTime());
+        break;
+      case DOUBLE:
+        updateDoubleFirstValue((double) statistics.getFirstValue(), statistics.getStartTime());
+        break;
+      case TEXT:
+        updateBinaryFirstValue((Binary) statistics.getFirstValue(), statistics.getStartTime());
+      case BOOLEAN:
+        updateBooleanFirstValue((boolean) statistics.getFirstValue(), statistics.getStartTime());
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+    }
+  }
+
+  // finalResult should be single column, like: | finalFirstValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    firstValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in FirstValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 2, "partialResult of FirstValue should be 2");
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilders[0].writeInt(firstValue.getInt());
+        break;
+      case INT64:
+        columnBuilders[0].writeLong(firstValue.getLong());
+        break;
+      case FLOAT:
+        columnBuilders[0].writeFloat(firstValue.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilders[0].writeDouble(firstValue.getDouble());
+        break;
+      case TEXT:
+        columnBuilders[0].writeBinary(firstValue.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilders[0].writeBoolean(firstValue.getBoolean());
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+    columnBuilders[1].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(firstValue.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(firstValue.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(firstValue.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(firstValue.getDouble());
+        break;
+      case TEXT:
+        columnBuilder.writeBinary(firstValue.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilder.writeBoolean(firstValue.getBoolean());
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    this.minTime = Long.MAX_VALUE;
+    this.firstValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {firstValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return firstValue.getDataType();
+  }
+
+  protected void addIntInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateIntFirstValue(column[1].getInt(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateIntFirstValue(int value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setInt(value);
+    }
+  }
+
+  protected void addLongInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateLongFirstValue(column[1].getLong(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateLongFirstValue(long value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setLong(value);
+    }
+  }
+
+  protected void addFloatInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateFloatFirstValue(column[1].getFloat(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateFloatFirstValue(float value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setFloat(value);
+    }
+  }
+
+  protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateDoubleFirstValue(column[1].getDouble(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateDoubleFirstValue(double value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setDouble(value);
+    }
+  }
+
+  protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBooleanFirstValue(column[1].getBoolean(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateBooleanFirstValue(boolean value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setBoolean(value);
+    }
+  }
+
+  protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBinaryFirstValue(column[1].getBinary(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateBinaryFirstValue(Binary value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setBinary(value);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
new file mode 100644
index 0000000000..08c04b8185
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class FirstValueDescAccumulator extends FirstValueAccumulator {
+
+  public FirstValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  // Don't break in advance
+  protected void addIntInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateIntFirstValue(column[1].getInt(i), curTime);
+      }
+    }
+  }
+
+  protected void addLongInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateLongFirstValue(column[1].getLong(i), curTime);
+      }
+    }
+  }
+
+  protected void addFloatInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateFloatFirstValue(column[1].getFloat(i), curTime);
+      }
+    }
+  }
+
+  protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateDoubleFirstValue(column[1].getDouble(i), curTime);
+      }
+    }
+  }
+
+  protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBooleanFirstValue(column[1].getBoolean(i), curTime);
+      }
+    }
+  }
+
+  protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBinaryFirstValue(column[1].getBinary(i), curTime);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
new file mode 100644
index 0000000000..db418a490e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class LastValueAccumulator implements Accumulator {
+
+  protected final TSDataType seriesDataType;
+  protected TsPrimitiveType lastValue;
+  protected long maxTime = Long.MIN_VALUE;
+
+  public LastValueAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    lastValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+        addBinaryInput(column, timeRange);
+        break;
+      case BOOLEAN:
+        addBooleanInput(column, timeRange);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in LastValue: %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | LastValue | MaxTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 2, "partialResult of LastValue should be 2");
+    switch (seriesDataType) {
+      case INT32:
+        updateIntLastValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
+        break;
+      case INT64:
+        updateLongLastValue(partialResult[0].getLong(0), partialResult[1].getLong(0));
+        break;
+      case FLOAT:
+        updateFloatLastValue(partialResult[0].getFloat(0), partialResult[1].getLong(0));
+        break;
+      case DOUBLE:
+        updateDoubleLastValue(partialResult[0].getDouble(0), partialResult[1].getLong(0));
+        break;
+      case TEXT:
+        updateBinaryLastValue(partialResult[0].getBinary(0), partialResult[1].getLong(0));
+        break;
+      case BOOLEAN:
+        updateBooleanLastValue(partialResult[0].getBoolean(0), partialResult[1].getLong(0));
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in LastValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    switch (seriesDataType) {
+      case INT32:
+        updateIntLastValue((int) statistics.getLastValue(), statistics.getEndTime());
+        break;
+      case INT64:
+        updateLongLastValue((long) statistics.getLastValue(), statistics.getEndTime());
+        break;
+      case FLOAT:
+        updateFloatLastValue((float) statistics.getLastValue(), statistics.getEndTime());
+        break;
+      case DOUBLE:
+        updateDoubleLastValue((double) statistics.getLastValue(), statistics.getEndTime());
+        break;
+      case TEXT:
+        updateBinaryLastValue((Binary) statistics.getLastValue(), statistics.getEndTime());
+      case BOOLEAN:
+        updateBooleanLastValue((boolean) statistics.getLastValue(), statistics.getEndTime());
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in LastValue: %s", seriesDataType));
+    }
+  }
+
+  // finalResult should be single column, like: | finalLastValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    lastValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in LastValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 2, "partialResult of LastValue should be 2");
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilders[0].writeInt(lastValue.getInt());
+        break;
+      case INT64:
+        columnBuilders[0].writeLong(lastValue.getLong());
+        break;
+      case FLOAT:
+        columnBuilders[0].writeFloat(lastValue.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilders[0].writeDouble(lastValue.getDouble());
+        break;
+      case TEXT:
+        columnBuilders[0].writeBinary(lastValue.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilders[0].writeBoolean(lastValue.getBoolean());
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+    columnBuilders[1].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(lastValue.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(lastValue.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(lastValue.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(lastValue.getDouble());
+        break;
+      case TEXT:
+        columnBuilder.writeBinary(lastValue.getBinary());
+        break;
+      case BOOLEAN:
+        columnBuilder.writeBoolean(lastValue.getBoolean());
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+    this.lastValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {lastValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return lastValue.getDataType();
+  }
+
+  protected void addIntInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateIntLastValue(column[1].getInt(i), curTime);
+      }
+    }
+  }
+
+  protected void updateIntLastValue(int value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setInt(value);
+    }
+  }
+
+  protected void addLongInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateLongLastValue(column[1].getLong(i), curTime);
+      }
+    }
+  }
+
+  protected void updateLongLastValue(long value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setLong(value);
+    }
+  }
+
+  protected void addFloatInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateFloatLastValue(column[1].getFloat(i), curTime);
+      }
+    }
+  }
+
+  protected void updateFloatLastValue(float value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setFloat(value);
+    }
+  }
+
+  protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateDoubleLastValue(column[1].getDouble(i), curTime);
+      }
+    }
+  }
+
+  protected void updateDoubleLastValue(double value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setDouble(value);
+    }
+  }
+
+  protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBooleanLastValue(column[1].getBoolean(i), curTime);
+      }
+    }
+  }
+
+  protected void updateBooleanLastValue(boolean value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setBoolean(value);
+    }
+  }
+
+  protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBinaryLastValue(column[1].getBinary(i), curTime);
+      }
+    }
+  }
+
+  protected void updateBinaryLastValue(Binary value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setBinary(value);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
new file mode 100644
index 0000000000..3360bad7a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class LastValueDescAccumulator extends LastValueAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  public LastValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
+  protected void addIntInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateIntLastValue(column[1].getInt(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void addLongInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateLongLastValue(column[1].getLong(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void addFloatInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateFloatLastValue(column[1].getFloat(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateDoubleLastValue(column[1].getDouble(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBooleanLastValue(column[1].getBoolean(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateBinaryLastValue(column[1].getBinary(i), curTime);
+        break;
+      }
+    }
+  }
+
+  protected void updateIntLastValue(int value, long curTime) {
+    hasCandidateResult = true;
+    super.updateIntLastValue(value, curTime);
+  }
+
+  protected void updateLongLastValue(long value, long curTime) {
+    hasCandidateResult = true;
+    super.updateLongLastValue(value, curTime);
+  }
+
+  protected void updateFloatLastValue(float value, long curTime) {
+    hasCandidateResult = true;
+    super.updateFloatLastValue(value, curTime);
+  }
+
+  protected void updateDoubleLastValue(double value, long curTime) {
+    hasCandidateResult = true;
+    super.updateDoubleLastValue(value, curTime);
+  }
+
+  protected void updateBooleanLastValue(boolean value, long curTime) {
+    hasCandidateResult = true;
+    super.updateBooleanLastValue(value, curTime);
+  }
+
+  protected void updateBinaryLastValue(Binary value, long curTime) {
+    hasCandidateResult = true;
+    super.updateBinaryLastValue(value, curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
new file mode 100644
index 0000000000..ce47210337
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MaxTimeAccumulator implements Accumulator {
+
+  protected long maxTime = Long.MIN_VALUE;
+
+  public MaxTimeAccumulator() {}
+
+  // Column should be like: | Time | Value |
+  // Value is used to judge isNull()
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateMaxTime(curTime);
+      }
+    }
+  }
+
+  // partialResult should be like: | partialMaxTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of MaxTime should be 1");
+    updateMaxTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMaxTime(statistics.getEndTime());
+  }
+
+  // finalResult should be single column, like: | finalMaxTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    maxTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in maxTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of MaxTime should be 1");
+    columnBuilders[0].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(maxTime);
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  protected void updateMaxTime(long curTime) {
+    maxTime = Math.max(maxTime, curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
new file mode 100644
index 0000000000..b753e9ca3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  // Column should be like: | Time | Value |
+  // Value is used to judge isNull()
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateMaxTime(curTime);
+        break;
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
+  protected void updateMaxTime(long curTime) {
+    hasCandidateResult = true;
+    super.updateMaxTime(curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
new file mode 100644
index 0000000000..c07480a13d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MaxValueAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private TsPrimitiveType maxResult;
+  private boolean initResult;
+
+  public MaxValueAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    this.maxResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | partialMaxValue1 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of MaxValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult(partialResult[0].getInt(0));
+        break;
+      case INT64:
+        updateLongResult(partialResult[0].getLong(0));
+        break;
+      case FLOAT:
+        updateFloatResult(partialResult[0].getFloat(0));
+        break;
+      case DOUBLE:
+        updateDoubleResult(partialResult[0].getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult((int) statistics.getMaxValue());
+        break;
+      case INT64:
+        updateLongResult((long) statistics.getMaxValue());
+        break;
+      case FLOAT:
+        updateFloatResult((float) statistics.getMaxValue());
+        break;
+      case DOUBLE:
+        updateDoubleResult((double) statistics.getMaxValue());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
+  }
+
+  // finalResult should be single column, like: | finalCountValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    maxResult.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be single in countAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of MaxValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilders[0].writeInt(maxResult.getInt());
+        break;
+      case INT64:
+        columnBuilders[0].writeLong(maxResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilders[0].writeFloat(maxResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilders[0].writeDouble(maxResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(maxResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(maxResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(maxResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(maxResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    this.maxResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {maxResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return maxResult.getDataType();
+  }
+
+  private void addIntInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
+      }
+    }
+  }
+
+  private void updateIntResult(int maxVal) {
+    if (!initResult || maxVal > maxResult.getInt()) {
+      initResult = true;
+      maxResult.setInt(maxVal);
+    }
+  }
+
+  private void addLongInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
+      }
+    }
+  }
+
+  private void updateLongResult(long maxVal) {
+    if (!initResult || maxVal > maxResult.getLong()) {
+      initResult = true;
+      maxResult.setLong(maxVal);
+    }
+  }
+
+  private void addFloatInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
+      }
+    }
+  }
+
+  private void updateFloatResult(float maxVal) {
+    if (!initResult || maxVal > maxResult.getFloat()) {
+      initResult = true;
+      maxResult.setFloat(maxVal);
+    }
+  }
+
+  private void addDoubleInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
+      }
+    }
+  }
+
+  private void updateDoubleResult(double maxVal) {
+    if (!initResult || maxVal > maxResult.getDouble()) {
+      initResult = true;
+      maxResult.setDouble(maxVal);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
new file mode 100644
index 0000000000..a6af85093d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MinTimeAccumulator implements Accumulator {
+
+  protected boolean hasCandidateResult;
+  protected long minTime = Long.MAX_VALUE;
+
+  public MinTimeAccumulator() {}
+
+  // Column should be like: | Time | Value |
+  // Value is used to judge isNull()
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+        updateMinTime(curTime);
+        break;
+      }
+    }
+  }
+
+  // partialResult should be like: | partialMinTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of MinTime should be 1");
+    updateMinTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMinTime(statistics.getStartTime());
+  }
+
+  // finalResult should be single column, like: | finalMinTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    minTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in minTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of MinTime should be 1");
+    columnBuilders[0].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(minTime);
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    this.minTime = Long.MAX_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  protected void updateMinTime(long curTime) {
+    hasCandidateResult = true;
+    minTime = Math.min(minTime, curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 4ee69b14e7..e97fa3ca70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -16,42 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
 
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+package org.apache.iotdb.db.mpp.aggregation;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class AggregateOperator implements ProcessOperator {
+public class MinTimeDescAccumulator extends MinTimeAccumulator {
 
   @Override
-  public OperatorContext getOperatorContext() {
-    return null;
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMinTime(curTime);
+      }
+    }
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
-
-  @Override
-  public boolean isFinished() {
+  public boolean hasFinalResult() {
     return false;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
new file mode 100644
index 0000000000..8b24de07ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MinValueAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private TsPrimitiveType minResult;
+  private boolean initResult = false;
+
+  public MinValueAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    this.minResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | partialMinValue1 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of MinValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult(partialResult[0].getInt(0));
+        break;
+      case INT64:
+        updateLongResult(partialResult[0].getLong(0));
+        break;
+      case FLOAT:
+        updateFloatResult(partialResult[0].getFloat(0));
+        break;
+      case DOUBLE:
+        updateDoubleResult(partialResult[0].getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult((int) statistics.getMinValue());
+        break;
+      case INT64:
+        updateLongResult((long) statistics.getMinValue());
+        break;
+      case FLOAT:
+        updateFloatResult((float) statistics.getMinValue());
+        break;
+      case DOUBLE:
+        updateDoubleResult((double) statistics.getMinValue());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
+  }
+
+  // finalResult should be single column, like: | finalCountValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    minResult.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be single in MinValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of MinValue should be 1");
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilders[0].writeInt(minResult.getInt());
+        break;
+      case INT64:
+        columnBuilders[0].writeLong(minResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilders[0].writeFloat(minResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilders[0].writeDouble(minResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(minResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(minResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(minResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(minResult.getDouble());
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    this.minResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {minResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return minResult.getDataType();
+  }
+
+  private void addIntInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
+      }
+    }
+  }
+
+  private void updateIntResult(int minVal) {
+    if (!initResult || minVal < minResult.getInt()) {
+      initResult = true;
+      minResult.setInt(minVal);
+    }
+  }
+
+  private void addLongInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
+      }
+    }
+  }
+
+  private void updateLongResult(long minVal) {
+    if (!initResult || minVal < minResult.getLong()) {
+      initResult = true;
+      minResult.setLong(minVal);
+    }
+  }
+
+  private void addFloatInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
+      }
+    }
+  }
+
+  private void updateFloatResult(float minVal) {
+    if (!initResult || minVal < minResult.getFloat()) {
+      initResult = true;
+      minResult.setFloat(minVal);
+    }
+  }
+
+  private void addDoubleInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
+      }
+    }
+  }
+
+  private void updateDoubleResult(double minVal) {
+    if (!initResult || minVal < minResult.getDouble()) {
+      initResult = true;
+      minResult.setDouble(minVal);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
new file mode 100644
index 0000000000..132fb017a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class SumAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private double sumValue = 0;
+
+  public SumAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(column, timeRange);
+        break;
+      case INT64:
+        addLongInput(column, timeRange);
+        break;
+      case FLOAT:
+        addFloatInput(column, timeRange);
+        break;
+      case DOUBLE:
+        addDoubleInput(column, timeRange);
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | partialSumValue1 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of Sum should be 1");
+    sumValue += partialResult[0].getDouble(0);
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
+  }
+
+  // finalResult should be single column, like: | finalSumValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    sumValue = finalResult.getDouble(0);
+  }
+
+  // columnBuilder should be single in countAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult of Sum should be 1");
+    columnBuilders[0].writeDouble(sumValue);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeDouble(sumValue);
+  }
+
+  @Override
+  public void reset() {
+    this.sumValue = 0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void addIntInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        sumValue += column[1].getInt(i);
+      }
+    }
+  }
+
+  private void addLongInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        sumValue += column[1].getLong(i);
+      }
+    }
+  }
+
+  private void addFloatInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        sumValue += column[1].getFloat(i);
+      }
+    }
+  }
+
+  private void addDoubleInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      if (!column[1].isNull(i)) {
+        sumValue += column[1].getDouble(i);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
index 4ee69b14e7..71e7817b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
@@ -18,16 +18,31 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.List;
+
 public class AggregateOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+  private final List<Aggregator> aggregators;
+  private final List<Operator> children;
+
+  public AggregateOperator(
+      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.aggregators = aggregators;
+    this.children = children;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 04658826c2..a90dcfd0bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -114,7 +114,7 @@ public class DeviceViewOperator implements ProcessOperator {
     }
     // construct device column
     ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
-    deviceColumnBuilder.writeObject(new Binary(getCurDeviceName()));
+    deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName()));
     newValueColumns[0] =
         new RunLengthEncodedColumn(deviceColumnBuilder.build(), tsBlock.getPositionCount());
     // construct other null columns
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index 842b907bcc..e5d657713f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -19,18 +19,15 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -44,9 +41,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This operator is responsible to do the aggregation calculation for one series based on global
@@ -62,15 +59,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   private final PlanNodeId sourceId;
   private final SeriesScanUtil seriesScanUtil;
   private final boolean ascending;
-  private List<AggregateResult> aggregateResultList;
+  // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step.
+  // But in facing of statistics, it will invoke another method processStatistics()
+  private List<Aggregator> aggregators;
 
   private ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
-  private TsBlockSingleColumnIterator preCachedData;
-  // used for resetting the preCachedData to the last read index
-  private int lastReadIndex;
+  private TsBlock preCachedData;
 
   private TsBlockBuilder tsBlockBuilder;
   private TsBlock resultTsBlock;
@@ -82,7 +79,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       PartialPath seriesPath,
       Set<String> allSensors,
       OperatorContext context,
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
@@ -98,21 +95,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             timeFilter,
             null,
             ascending);
-    aggregateResultList = new ArrayList<>(aggregateFuncList.size());
-    for (AggregationType aggregationType : aggregateFuncList) {
-      aggregateResultList.add(
-          AggregateResultFactory.getAggrResultByType(
-              aggregationType,
-              seriesPath.getSeriesType(),
-              seriesScanUtil.getOrderUtils().getAscending()));
+    this.aggregators = aggregators;
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    tsBlockBuilder =
-        new TsBlockBuilder(
-            aggregateFuncList.stream()
-                .map(
-                    functionType ->
-                        SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name()))
-                .collect(Collectors.toList()));
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
   }
 
@@ -169,8 +157,9 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       curTimeRange = timeRangeIterator.nextTimeRange();
 
       // 1. Clear previous aggregation result
-      for (AggregateResult result : aggregateResultList) {
-        result.reset();
+      for (Aggregator aggregator : aggregators) {
+        aggregator.reset();
+        aggregator.setTimeRange(curTimeRange);
       }
 
       // 2. Calculate aggregation result based on current time window
@@ -226,14 +215,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   private void updateResultTsBlockUsingAggregateResult() {
-    // TODO AVG
     tsBlockBuilder.reset();
     TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(curTimeRange.getMin());
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    for (int i = 0; i < aggregateResultList.size(); i++) {
-      columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+    int columnIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
     resultTsBlock = tsBlockBuilder.build();
@@ -273,41 +267,33 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   @SuppressWarnings("squid:S3776")
-  private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange)
-      throws IOException {
+  private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (!satisfied(blockIterator, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
       return;
     }
 
-    for (AggregateResult result : aggregateResultList) {
+    // skip points that cannot be calculated
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+
+    for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
-      if (result.hasFinalResult()) {
+      if (aggregator.hasFinalResult()) {
         continue;
       }
-      // lazy reset batch data for calculation
-      blockIterator.setRowIndex(lastReadIndex);
-      // skip points that cannot be calculated
-      skipOutOfTimeRangePoints(blockIterator, curTimeRange);
-
-      if (blockIterator.hasNext()) {
-        result.updateResultFromPageData(
-            blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
-      }
-    }
 
-    // reset the last position to current Index
-    lastReadIndex = blockIterator.getRowIndex();
+      aggregator.processTsBlock(tsBlock);
+    }
 
     // can calc for next interval
-    if (blockIterator.hasNext()) {
-      preCachedData = blockIterator;
+    if (tsBlock.getTsBlockSingleColumnIterator().hasNext()) {
+      preCachedData = tsBlock;
     }
   }
 
   // skip points that cannot be calculated
-  private void skipOutOfTimeRangePoints(
-      TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (ascending) {
       while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
         tsBlockIterator.next();
@@ -317,9 +303,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         tsBlockIterator.next();
       }
     }
+    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
   }
 
-  private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
     }
@@ -332,15 +320,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     if (!ascending
         && (tsBlockIterator.getStartTime() >= timeRange.getMax()
             || tsBlockIterator.currentTime() < timeRange.getMin())) {
-      preCachedData = tsBlockIterator;
+      preCachedData = tsBlock;
       return false;
     }
     return true;
   }
 
   private boolean isEndCalc() {
-    for (AggregateResult result : aggregateResultList) {
-      if (!result.hasFinalResult()) {
+    for (Aggregator aggregator : aggregators) {
+      if (!aggregator.hasFinalResult()) {
         return false;
       }
     }
@@ -375,23 +363,23 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       }
 
       // calc from page data
-      TsBlockSingleColumnIterator tsBlockIterator =
-          seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+      TsBlock tsBlock = seriesScanUtil.nextPage();
+      TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
       if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
         continue;
       }
 
       // reset the last position to current Index
-      lastReadIndex = tsBlockIterator.getRowIndex();
+      // lastReadIndex = tsBlockIterator.getRowIndex();
 
       // stop calc and cached current batchData
       if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
-        preCachedData = tsBlockIterator;
+        preCachedData = tsBlock;
         return true;
       }
 
       // calc from batch data
-      calcFromBatch(tsBlockIterator, curTimeRange);
+      calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
       if (isEndCalc()
@@ -432,15 +420,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   private void calcFromStatistics(Statistics statistics) {
-    try {
-      for (AggregateResult result : aggregateResultList) {
-        if (result.hasFinalResult()) {
-          continue;
-        }
-        result.updateResultFromStatistics(statistics);
+    for (int i = 0; i < aggregators.size(); i++) {
+      Aggregator aggregator = aggregators.get(i);
+      if (aggregator.hasFinalResult()) {
+        continue;
       }
-    } catch (QueryProcessException e) {
-      throw new RuntimeException("Error while updating result using statistics", e);
+      aggregator.processStatistics(statistics);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index fb876a35ac..4f42b6871c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -321,7 +321,7 @@ public class LocalExecutionPlanner {
               seriesPath,
               node.getAllSensors(),
               operatorContext,
-              node.getAggregateFuncList(),
+              null,
               node.getTimeFilter(),
               ascending,
               node.getGroupByTimeParameter());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index f9d2006473..e7f0b42e36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -59,6 +59,10 @@ public class AggregationDescriptor {
     return aggregationType;
   }
 
+  public AggregationStep getStep() {
+    return step;
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     step.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
index 0350c7db0f..d7fcca9e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class InputLocation {
-  // which input tsblock
+  // which input tsBlock
   private final int tsBlockIndex;
-  // which value column of that tsblock
+  // which value column of that tsBlock
   private final int valueColumnIndex;
 
   public InputLocation(int tsBlockIndex, int valueColumnIndex) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
new file mode 100644
index 0000000000..6fe08cd345
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatorTest {
+
+  private TsBlock rawData;
+  private Statistics statistics;
+  private TimeRange defaultTimeRange = new TimeRange(0, Long.MAX_VALUE);
+
+  @Before
+  public void setUp() {
+    initInputTsBlock();
+  }
+
+  private void initInputTsBlock() {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE);
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    for (int i = 0; i < 100; i++) {
+      timeColumnBuilder.writeLong(i);
+      columnBuilders[0].writeDouble(i * 1.0);
+      tsBlockBuilder.declarePosition();
+    }
+    rawData = tsBlockBuilder.build();
+
+    statistics = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics.update(100L, 100d);
+  }
+
+  @Test
+  public void avgAccumulatorTest() {
+    Accumulator avgAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.AVG, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.INT64, avgAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.DOUBLE, avgAccumulator.getIntermediateType()[1]);
+    Assert.assertEquals(TSDataType.DOUBLE, avgAccumulator.getFinalType());
+
+    avgAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(avgAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+    intermediateResult[0] = new LongColumnBuilder(null, 1);
+    intermediateResult[1] = new DoubleColumnBuilder(null, 1);
+    avgAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(100, intermediateResult[0].build().getLong(0));
+    Assert.assertEquals(4950d, intermediateResult[1].build().getDouble(0), 0.001);
+
+    // add intermediate result as input
+    avgAccumulator.addIntermediate(
+        new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    avgAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(49.5d, finalResult.build().getDouble(0), 0.001);
+
+    avgAccumulator.reset();
+    avgAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    avgAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void countAccumulatorTest() {
+    Accumulator countAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.INT64, countAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT64, countAccumulator.getFinalType());
+
+    countAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(countAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new LongColumnBuilder(null, 1);
+    countAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(100, intermediateResult[0].build().getLong(0));
+
+    // add intermediate result as input
+    countAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+    countAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(200, finalResult.build().getLong(0));
+
+    countAccumulator.reset();
+    countAccumulator.addStatistics(statistics);
+    finalResult = new LongColumnBuilder(null, 1);
+    countAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(1, finalResult.build().getLong(0));
+  }
+
+  @Test
+  public void extremeAccumulatorTest() {
+    Accumulator extremeAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.EXTREME, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+    extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(extremeAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+
+    // add intermediate result as input
+    extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(99d, finalResult.build().getDouble(0), 0.001);
+
+    extremeAccumulator.reset();
+    extremeAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void firstValueAccumulatorTest() {
+    Accumulator firstValueAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.FIRST_VALUE, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, firstValueAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT64, firstValueAccumulator.getIntermediateType()[1]);
+    Assert.assertEquals(TSDataType.DOUBLE, firstValueAccumulator.getFinalType());
+
+    firstValueAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertTrue(firstValueAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    intermediateResult[1] = new LongColumnBuilder(null, 1);
+    firstValueAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(0d, intermediateResult[0].build().getDouble(0), 0.001);
+    Assert.assertEquals(0L, intermediateResult[1].build().getLong(0));
+
+    // add intermediate result as input
+    firstValueAccumulator.addIntermediate(
+        new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    firstValueAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(0L, finalResult.build().getDouble(0), 0.001);
+
+    firstValueAccumulator.reset();
+    firstValueAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    firstValueAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void lastValueAccumulatorTest() {
+    Accumulator lastValueAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.LAST_VALUE, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, lastValueAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT64, lastValueAccumulator.getIntermediateType()[1]);
+    Assert.assertEquals(TSDataType.DOUBLE, lastValueAccumulator.getFinalType());
+
+    lastValueAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    intermediateResult[1] = new LongColumnBuilder(null, 1);
+    lastValueAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+    Assert.assertEquals(99L, intermediateResult[1].build().getLong(0));
+
+    // add intermediate result as input
+    lastValueAccumulator.addIntermediate(
+        new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    lastValueAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(99L, finalResult.build().getDouble(0), 0.001);
+
+    lastValueAccumulator.reset();
+    lastValueAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    lastValueAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void maxTimeAccumulatorTest() {
+    Accumulator maxTimeAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.MAX_TIME, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.INT64, maxTimeAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT64, maxTimeAccumulator.getFinalType());
+
+    maxTimeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(maxTimeAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new LongColumnBuilder(null, 1);
+    maxTimeAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(99, intermediateResult[0].build().getLong(0));
+
+    // add intermediate result as input
+    maxTimeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+    maxTimeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(99, finalResult.build().getLong(0));
+
+    maxTimeAccumulator.reset();
+    maxTimeAccumulator.addStatistics(statistics);
+    finalResult = new LongColumnBuilder(null, 1);
+    maxTimeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100, finalResult.build().getLong(0));
+  }
+
+  @Test
+  public void minTimeAccumulatorTest() {
+    Accumulator minTimeAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.MIN_TIME, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.INT64, minTimeAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT64, minTimeAccumulator.getFinalType());
+
+    minTimeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertTrue(minTimeAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new LongColumnBuilder(null, 1);
+    minTimeAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(0, intermediateResult[0].build().getLong(0));
+
+    // add intermediate result as input
+    minTimeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+    minTimeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(0, finalResult.build().getLong(0));
+
+    minTimeAccumulator.reset();
+    minTimeAccumulator.addStatistics(statistics);
+    finalResult = new LongColumnBuilder(null, 1);
+    minTimeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100, finalResult.build().getLong(0));
+  }
+
+  @Test
+  public void maxValueAccumulatorTest() {
+    Accumulator extremeAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.MAX_VALUE, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+    extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(extremeAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+
+    // add intermediate result as input
+    extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(99d, finalResult.build().getDouble(0), 0.001);
+
+    extremeAccumulator.reset();
+    extremeAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void minValueAccumulatorTest() {
+    Accumulator extremeAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.MIN_VALUE, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+    extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(extremeAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(0d, intermediateResult[0].build().getDouble(0), 0.001);
+
+    // add intermediate result as input
+    extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(0d, finalResult.build().getDouble(0), 0.001);
+
+    extremeAccumulator.reset();
+    extremeAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    extremeAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+
+  @Test
+  public void sumAccumulatorTest() {
+    Accumulator sumAccumulator =
+        AccumulatorFactory.createAccumulator(AggregationType.SUM, TSDataType.DOUBLE, true);
+    Assert.assertEquals(TSDataType.DOUBLE, sumAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.DOUBLE, sumAccumulator.getFinalType());
+
+    sumAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+    Assert.assertFalse(sumAccumulator.hasFinalResult());
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+    sumAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertEquals(4950d, intermediateResult[0].build().getDouble(0), 0.001);
+
+    // add intermediate result as input
+    sumAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+    ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+    sumAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(9900d, finalResult.build().getDouble(0), 0.001);
+
+    sumAccumulator.reset();
+    sumAccumulator.addStatistics(statistics);
+    finalResult = new DoubleColumnBuilder(null, 1);
+    sumAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
index 7b3a158c7e..36acc04562 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -33,6 +35,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -86,9 +89,12 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -103,8 +109,11 @@ public class SeriesAggregateScanOperatorTest {
     List<AggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -124,8 +133,40 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
+      throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -142,10 +183,13 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithTimeFilter1() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     Filter timeFilter = TimeFilter.gtEq(120);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -158,9 +202,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter2() throws IllegalPathException {
     Filter timeFilter = TimeFilter.ltEq(379);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -173,9 +220,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter3() throws IllegalPathException {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -194,9 +244,12 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -215,9 +268,12 @@ public class SeriesAggregateScanOperatorTest {
   public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
     int[] result = new int[] {100, 100, 100, 100};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -233,12 +289,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT),
-            timeFilter,
-            true,
-            groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -264,8 +320,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -279,13 +338,49 @@ public class SeriesAggregateScanOperatorTest {
     assertEquals(4, count);
   }
 
+  @Test
+  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 399},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][3 - count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][3 - count], resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(result[2][3 - count], resultTsBlock.getColumn(2).getInt(0));
+      assertEquals(result[3][3 - count], resultTsBlock.getColumn(3).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
   @Test
   public void testGroupBySlidingTimeWindow() throws IllegalPathException {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -301,9 +396,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -330,8 +428,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -346,7 +447,7 @@ public class SeriesAggregateScanOperatorTest {
   }
 
   public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter)
@@ -373,7 +474,7 @@ public class SeriesAggregateScanOperatorTest {
             measurementPath,
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
-            aggregateFuncList,
+            aggregators,
             timeFilter,
             ascending,
             groupByTimeParameter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index a5657d74fe..d3ffe65ff3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -92,12 +92,6 @@ public class TsBlock {
     }
   }
 
-  public boolean hasNext() {
-    return false;
-  }
-
-  public void next() {}
-
   public int getPositionCount() {
     return positionCount;
   }
@@ -170,6 +164,23 @@ public class TsBlock {
     return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
   }
 
+  /**
+   * This method will create a temporary view of origin tsBlock, which will reuse the arrays of
+   * columns but with different offset. It can be used where you want to skip some points when
+   * getting iterator.
+   */
+  public TsBlock subTsBlock(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("FromIndex of subTsBlock cannot over positionCount.");
+    }
+    TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex);
+    Column[] subValueColumns = new Column[valueColumns.length];
+    for (int i = 0; i < subValueColumns.length; i++) {
+      subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
+    }
+    return new TsBlock(subTimeColumn, subValueColumns);
+  }
+
   public long getTimeByIndex(int index) {
     return timeColumn.getLong(index);
   }
@@ -186,6 +197,21 @@ public class TsBlock {
     return valueColumns[columnIndex];
   }
 
+  public Column[] getTimeAndValueColumn(int columnIndex) {
+    Column[] columns = new Column[2];
+    columns[0] = getTimeColumn();
+    columns[1] = getColumn(columnIndex);
+    return columns;
+  }
+
+  public Column[] getColumns(int[] columnIndexes) {
+    Column[] columns = new Column[columnIndexes.length];
+    for (int i = 0; i < columnIndexes.length; i++) {
+      columns[i] = valueColumns[columnIndexes[i]];
+    }
+    return columns;
+  }
+
   public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
     return new TsBlockSingleColumnIterator(0);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index f6b2c50e66..80fba8b7d2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -123,6 +123,15 @@ public class BinaryColumn implements Column {
     return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BinaryColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 23dd445a34..4fab293acb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -122,6 +122,15 @@ public class BooleanColumn implements Column {
     return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BooleanColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index c9ad6eddb3..2e796a5f68 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -103,6 +103,12 @@ public interface Column {
    */
   Column getRegion(int positionOffset, int length);
 
+  /**
+   * This method will create a temporary view of origin column, which will reuse the array of column
+   * but with different array offset.
+   */
+  Column subColumn(int fromIndex);
+
   /** reverse the column */
   void reverse();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index a9e64e4224..25c97b188f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -122,6 +122,15 @@ public class DoubleColumn implements Column {
     return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new DoubleColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index efa1243028..b1831d15cb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -121,6 +121,14 @@ public class FloatColumn implements Column {
     return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 5ba777c137..f2556c75ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -121,6 +121,14 @@ public class IntColumn implements Column {
     return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 4fd1fadbb2..e4364a7072 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -121,6 +121,14 @@ public class LongColumn implements Column {
     return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index d82d7cf464..b33995ee92 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -147,6 +147,14 @@ public class RunLengthEncodedColumn implements Column {
     return new RunLengthEncodedColumn(value, length);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new RunLengthEncodedColumn(value, positionCount - fromIndex);
+  }
+
   @Override
   public void reverse() {
     // do nothing because the underlying column has only one value
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index 2f8176b699..3f8db4f31d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,14 @@ public class TimeColumn implements Column {
     return new TimeColumn(positionOffset + arrayOffset, length, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, values);
+  }
+
   @Override
   public void reverse() {
     for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 7bfd218374..42d8c96cfa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable {
   /**
    * get tsPrimitiveType by resultDataType.
    *
-   * @param dataType -given TsDataType
-   * @param v -
+   * @param dataType given TsDataType
+   */
+  public static TsPrimitiveType getByType(TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return new TsPrimitiveType.TsBoolean();
+      case INT32:
+        return new TsPrimitiveType.TsInt();
+      case INT64:
+        return new TsPrimitiveType.TsLong();
+      case FLOAT:
+        return new TsPrimitiveType.TsFloat();
+      case DOUBLE:
+        return new TsPrimitiveType.TsDouble();
+      case TEXT:
+        return new TsPrimitiveType.TsBinary();
+      case VECTOR:
+        return new TsPrimitiveType.TsVector();
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
+    }
+  }
+
+  /**
+   * get tsPrimitiveType by resultDataType and initial value.
+   *
+   * @param dataType given TsDataType
+   * @param v initial value
    */
   public static TsPrimitiveType getByType(TSDataType dataType, Object v) {
     switch (dataType) {
@@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable {
     throw new UnsupportedOperationException("setVector() is not supported for current sub-class");
   }
 
+  public abstract void setObject(Object val);
+
+  public abstract void reset();
+
   /**
    * get the size of one instance of current class.
    *
@@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private boolean value;
 
+    public TsBoolean() {}
+
     public TsBoolean(boolean value) {
       this.value = value;
     }
@@ -156,6 +188,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Boolean) {
+        setBoolean((Boolean) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsBoolean can only be set Boolean value");
+    }
+
+    @Override
+    public void reset() {
+      value = false;
+    }
+
     @Override
     public int getSize() {
       return 4 + 1;
@@ -198,6 +244,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private int value;
 
+    public TsInt() {}
+
     public TsInt(int value) {
       this.value = value;
     }
@@ -212,6 +260,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Integer) {
+        setInt((Integer) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -254,6 +316,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private long value;
 
+    public TsLong() {}
+
     public TsLong(long value) {
       this.value = value;
     }
@@ -268,6 +332,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Long) {
+        setLong((Long) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsLong can only be set Long value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -310,6 +388,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private float value;
 
+    public TsFloat() {}
+
     public TsFloat(float value) {
       this.value = value;
     }
@@ -324,6 +404,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Float) {
+        setFloat((Float) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsFloat can only be set float value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -366,6 +460,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private double value;
 
+    public TsDouble() {}
+
     public TsDouble(double value) {
       this.value = value;
     }
@@ -380,6 +476,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Double) {
+        setDouble((Double) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0.0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -422,6 +532,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private Binary value;
 
+    public TsBinary() {}
+
     public TsBinary(Binary value) {
       this.value = value;
     }
@@ -436,6 +548,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Binary) {
+        setBinary((Binary) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
+    }
+
+    @Override
+    public void reset() {
+      value = null;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4 + value.getLength();
@@ -478,6 +604,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private TsPrimitiveType[] values;
 
+    public TsVector() {}
+
     public TsVector(TsPrimitiveType[] values) {
       this.values = values;
     }
@@ -492,6 +620,20 @@ public abstract class TsPrimitiveType implements Serializable {
       this.values = vals;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof TsPrimitiveType[]) {
+        setVector((TsPrimitiveType[]) val);
+        return;
+      }
+      throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
+    }
+
+    @Override
+    public void reset() {
+      values = null;
+    }
+
     @Override
     public int getSize() {
       int size = 0;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
index 75d28596e9..669d3a41ad 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.block;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
@@ -30,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -322,4 +324,51 @@ public class TsBlockTest {
       }
     }
   }
+
+  @Test
+  public void testSubTsBlock() {
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    for (int i = 0; i < 10; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      builder.getColumnBuilder(0).writeInt(i);
+      builder.declarePosition();
+    }
+    TsBlock tsBlock = builder.build();
+    TsBlockSingleColumnIterator iterator = tsBlock.getTsBlockSingleColumnIterator();
+    int index = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subTsBlock from TsBlock, offset = 3
+    int offset = 3;
+    TsBlock subTsBlock = tsBlock.subTsBlock(offset);
+    iterator = subTsBlock.getTsBlockSingleColumnIterator();
+    index = offset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subSubTsBlock from subTsBlock, offset = 2
+    int nextOffset = 2;
+    TsBlock subSubTsBlock = subTsBlock.subTsBlock(nextOffset);
+    iterator = subSubTsBlock.getTsBlockSingleColumnIterator();
+    index = offset + nextOffset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    try {
+      subSubTsBlock.subTsBlock(3);
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("FromIndex of subTsBlock cannot over positionCount."));
+    }
+  }
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
new file mode 100644
index 0000000000..f21df099f7
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnTest {
+
+  @Test
+  public void timeColumnSubColumnTest() {
+    TimeColumnBuilder columnBuilder = new TimeColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    TimeColumn timeColumn = (TimeColumn) columnBuilder.build();
+    timeColumn = (TimeColumn) timeColumn.subColumn(5);
+    Assert.assertEquals(5, timeColumn.getPositionCount());
+    Assert.assertEquals(5, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(4));
+    try {
+      timeColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    timeColumn = (TimeColumn) timeColumn.subColumn(3);
+    Assert.assertEquals(2, timeColumn.getPositionCount());
+    Assert.assertEquals(8, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(1));
+    try {
+      timeColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      timeColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void binaryColumnSubColumnTest() {
+    BinaryColumnBuilder columnBuilder = new BinaryColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBinary(Binary.valueOf(String.valueOf(i)));
+    }
+    BinaryColumn binaryColumn = (BinaryColumn) columnBuilder.build();
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(5);
+    Assert.assertEquals(5, binaryColumn.getPositionCount());
+    Assert.assertEquals("5", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(4).toString());
+    try {
+      binaryColumn.getBinary(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(3);
+    Assert.assertEquals(2, binaryColumn.getPositionCount());
+    Assert.assertEquals("8", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(1).toString());
+    try {
+      binaryColumn.getBinary(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      binaryColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void booleanColumnSubColumnTest() {
+    BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, 10);
+    // 0: true, 1: false
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBoolean(i % 2 == 0);
+    }
+    BooleanColumn booleanColumn = (BooleanColumn) columnBuilder.build();
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(5);
+    Assert.assertEquals(5, booleanColumn.getPositionCount());
+    Assert.assertFalse(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(4));
+    try {
+      booleanColumn.getBoolean(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(3);
+    Assert.assertEquals(2, booleanColumn.getPositionCount());
+    Assert.assertTrue(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(1));
+    try {
+      booleanColumn.getBoolean(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      booleanColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void doubleColumnSubColumnTest() {
+    DoubleColumnBuilder columnBuilder = new DoubleColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeDouble(i);
+    }
+    DoubleColumn doubleColumn = (DoubleColumn) columnBuilder.build();
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(5);
+    Assert.assertEquals(5, doubleColumn.getPositionCount());
+    Assert.assertEquals(5.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(4), 0.001);
+    try {
+      doubleColumn.getDouble(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(3);
+    Assert.assertEquals(2, doubleColumn.getPositionCount());
+    Assert.assertEquals(8.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(1), 0.001);
+    try {
+      doubleColumn.getDouble(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      doubleColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void floatColumnSubColumnTest() {
+    FloatColumnBuilder columnBuilder = new FloatColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeFloat(i);
+    }
+    FloatColumn floatColumn = (FloatColumn) columnBuilder.build();
+    floatColumn = (FloatColumn) floatColumn.subColumn(5);
+    Assert.assertEquals(5, floatColumn.getPositionCount());
+    Assert.assertEquals(5.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(4), 0.001);
+    try {
+      floatColumn.getFloat(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    floatColumn = (FloatColumn) floatColumn.subColumn(3);
+    Assert.assertEquals(2, floatColumn.getPositionCount());
+    Assert.assertEquals(8.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(1), 0.001);
+    try {
+      floatColumn.getFloat(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      floatColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void intColumnSubColumnTest() {
+    IntColumnBuilder columnBuilder = new IntColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeInt(i);
+    }
+    IntColumn intColumn = (IntColumn) columnBuilder.build();
+    intColumn = (IntColumn) intColumn.subColumn(5);
+    Assert.assertEquals(5, intColumn.getPositionCount());
+    Assert.assertEquals(5, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(4));
+    try {
+      intColumn.getInt(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    intColumn = (IntColumn) intColumn.subColumn(3);
+    Assert.assertEquals(2, intColumn.getPositionCount());
+    Assert.assertEquals(8, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(1));
+    try {
+      intColumn.getInt(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      intColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void longColumnSubColumnTest() {
+    LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    LongColumn longColumn = (LongColumn) columnBuilder.build();
+    longColumn = (LongColumn) longColumn.subColumn(5);
+    Assert.assertEquals(5, longColumn.getPositionCount());
+    Assert.assertEquals(5, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(4));
+    try {
+      longColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    longColumn = (LongColumn) longColumn.subColumn(3);
+    Assert.assertEquals(2, longColumn.getPositionCount());
+    Assert.assertEquals(8, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(1));
+    try {
+      longColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      longColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void runLengthEncodedColumnSubColumnTest() {
+    LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, 1);
+    longColumnBuilder.writeLong(1);
+    RunLengthEncodedColumn column = new RunLengthEncodedColumn(longColumnBuilder.build(), 10);
+    column = (RunLengthEncodedColumn) column.subColumn(5);
+    Assert.assertEquals(5, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(4));
+    try {
+      column.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    column = (RunLengthEncodedColumn) column.subColumn(3);
+    Assert.assertEquals(2, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(1));
+    try {
+      column.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      column.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+}