You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/01 08:56:54 UTC

[iotdb] 05/09: implement part accumulator

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1065953784e6c2ed8aabf10ef255f2a0efb05394
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 17:00:09 2022 +0800

    implement part accumulator
---
 .../db/mpp/operator/aggregation/Accumulator.java   |  27 +++-
 ...regatorFactory.java => AccumulatorFactory.java} |  26 +++-
 .../db/mpp/operator/aggregation/Aggregator.java    |  18 +--
 .../mpp/operator/aggregation/AvgAccumulator.java   |  76 +++++++++--
 .../mpp/operator/aggregation/CountAccumulator.java |  20 ++-
 .../operator/aggregation/ExtremeAccumulator.java   | 142 +++++++++++++++++++++
 ...atorFactory.java => FirstValueAccumulator.java} |   5 +-
 ...gatorFactory.java => LastValueAccumulator.java} |   5 +-
 ...regatorFactory.java => MaxTimeAccumulator.java} |   5 +-
 ...ntAccumulator.java => MaxValueAccumulator.java} |  52 ++++++--
 ...regatorFactory.java => MinTimeAccumulator.java} |   5 +-
 ...ntAccumulator.java => MinValueAccumulator.java} |  54 ++++++--
 .../{CountAccumulator.java => SumAccumulator.java} |  70 ++++++++--
 .../source/SeriesAggregateScanOperator.java        |  26 ++--
 .../plan/parameter/AggregationDescriptor.java      |   4 +
 .../operator/SeriesAggregateScanOperatorTest.java  |  45 ++++---
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 139 +++++++++++++++++++-
 17 files changed, 606 insertions(+), 113 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index f6f268aad1..5833a75959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -25,20 +26,44 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public interface Accumulator {
 
-  // Column should be like: | Time | Value |
+  /** Column should be like: | Time | Value | */
   void addInput(Column[] column, TimeRange timeRange);
 
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
+   * last_value, it should be double column with dictionary order.
+   */
   void addIntermediate(Column[] partialResult);
 
+  /**
+   * This method can only be used in seriesAggregateScanOperator, it will use different statistics
+   * based on the type of Accumulator.
+   */
   void addStatistics(Statistics statistics);
 
+  /**
+   * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not
+   * allowed again.
+   */
   void setFinal(Column finalResult);
 
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single, so its output column
+   * is single too; But for AVG, last_value, it should be double column with dictionary order.
+   */
   void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
 
+  /** Final result is single column for any aggregation function. */
   void outputFinal(ColumnBuilder tsBlockBuilder);
 
   void reset();
 
+  /**
+   * For first_value or last_value in decreasing order, we can get final result by the first record.
+   */
   boolean hasFinalResult();
+
+  TSDataType[] getIntermediateType();
+
+  TSDataType getFinalType();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index fb49e8c8c1..1eb421c4e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -19,4 +19,28 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
-public class AggregatorFactory {}
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class AccumulatorFactory {
+
+  public static Accumulator createAccumulator(
+      AggregationType aggregationType, TSDataType tsDataType) {
+    switch (aggregationType) {
+      case COUNT:
+        return new CountAccumulator();
+      case AVG:
+        return new AvgAccumulator(tsDataType);
+      case SUM:
+      case EXTREME:
+      case MAX_TIME:
+      case MIN_TIME:
+      case MAX_VALUE:
+      case MIN_VALUE:
+      case LAST_VALUE:
+      case FIRST_VALUE:
+      default:
+        return null;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 8a05f460c5..a808c734bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -36,22 +36,14 @@ public class Aggregator {
   // In some intermediate result input, inputLocation[] should include two columns
   private final List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
-  private final TSDataType intermediateType;
-  private final TSDataType finalType;
 
-  private TimeRange timeRange;
+  private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
 
   public Aggregator(
-      Accumulator accumulator,
-      AggregationStep step,
-      List<InputLocation[]> inputLocationList,
-      TSDataType intermediateType,
-      TSDataType finalType) {
+      Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
     this.accumulator = accumulator;
     this.step = step;
     this.inputLocationList = inputLocationList;
-    this.intermediateType = intermediateType;
-    this.finalType = finalType;
   }
 
   // Used for SeriesAggregateScanOperator
@@ -96,11 +88,11 @@ public class Aggregator {
     accumulator.addStatistics(statistics);
   }
 
-  public TSDataType getOutputType() {
+  public TSDataType[] getOutputType() {
     if (step.isOutputPartial()) {
-      return intermediateType;
+      return new TSDataType[] {accumulator.getFinalType()};
     } else {
-      return finalType;
+      return accumulator.getIntermediateType();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
index dc993a65c5..0898eefcb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.operator.aggregation;
 
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
 public class AvgAccumulator implements Accumulator {
 
@@ -32,23 +34,60 @@ public class AvgAccumulator implements Accumulator {
   private long countValue;
   private double sumValue;
 
+  public AvgAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
   @Override
-  public void addInput(Column[] column, TimeRange timeRange) {}
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      countValue++;
+      updateSumValue(column[1].getObject(i));
+    }
+  }
 
+  // partialResult should be like: | countValue1 | sumValue1 |
   @Override
-  public void addIntermediate(Column[] partialResult) {}
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of Avg should be 2");
+    }
+    countValue += partialResult[0].getLong(0);
+    updateSumValue(partialResult[1].getObject(0));
+  }
 
   @Override
-  public void addStatistics(Statistics statistics) {}
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
+  }
 
+  // Set sumValue to finalResult and keep countValue equals to 1
   @Override
-  public void setFinal(Column finalResult) {}
+  public void setFinal(Column finalResult) {
+    reset();
+    updateSumValue(finalResult.getObject(0));
+  }
 
   @Override
-  public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {}
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(countValue);
+    columnBuilders[1].writeDouble(sumValue);
+  }
 
   @Override
-  public void outputFinal(ColumnBuilder tsBlockBuilder) {}
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeDouble(sumValue / countValue);
+  }
 
   @Override
   public void reset() {
@@ -61,26 +100,35 @@ public class AvgAccumulator implements Accumulator {
     return false;
   }
 
-  private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
-    double val;
-    switch (type) {
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+    switch (seriesDataType) {
       case INT32:
-        val = (int) sumVal;
+        sumValue += (int) sumVal;
         break;
       case INT64:
-        val = (long) sumVal;
+        sumValue = (long) sumVal;
         break;
       case FLOAT:
-        val = (float) sumVal;
+        sumValue = (float) sumVal;
         break;
       case DOUBLE:
-        val = (double) sumVal;
+        sumValue = (double) sumVal;
         break;
       case TEXT:
       case BOOLEAN:
       default:
         throw new UnSupportedDataTypeException(
-            String.format("Unsupported data type in aggregation AVG : %s", type));
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
index db266b55e4..84dae3f986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -29,6 +30,8 @@ public class CountAccumulator implements Accumulator {
 
   private long countValue = 0;
 
+  public CountAccumulator() {}
+
   // Column should be like: | Time | Value |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
@@ -42,12 +45,13 @@ public class CountAccumulator implements Accumulator {
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialCountValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of Count should be 1");
     }
+    countValue += partialResult[0].getLong(0);
   }
 
   @Override
@@ -81,4 +85,14 @@ public class CountAccumulator implements Accumulator {
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
new file mode 100644
index 0000000000..599feec646
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class ExtremeAccumulator implements Accumulator {
+
+  private TsPrimitiveType extremeResult;
+  private boolean hasCandidateResult;
+
+  public ExtremeAccumulator(TSDataType seriesDataType) {
+    this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      updateResult((Comparable<Object>) column[1].getObject(i));
+    }
+  }
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of ExtremeValue should be 1");
+    }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
+    Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+
+    Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal);
+    Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal);
+
+    Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal;
+    updateResult(extVal);
+  }
+
+  @Override
+  public void setFinal(Column finalResult) {
+    extremeResult.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be single in ExtremeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(extremeResult.getValue());
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(extremeResult.getValue());
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    extremeResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {extremeResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return extremeResult.getDataType();
+  }
+
+  public Object getAbsValue(Object v) {
+    switch (extremeResult.getDataType()) {
+      case DOUBLE:
+        return Math.abs((Double) v);
+      case FLOAT:
+        return Math.abs((Float) v);
+      case INT32:
+        return Math.abs((Integer) v);
+      case INT64:
+        return Math.abs((Long) v);
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(extremeResult.getDataType()));
+    }
+  }
+
+  private void updateResult(Comparable<Object> extVal) {
+    if (extVal == null) {
+      return;
+    }
+
+    Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+    Comparable<Object> candidateResult = (Comparable<Object>) extremeResult.getValue();
+    Comparable<Object> absCandidateResult =
+        (Comparable<Object>) getAbsValue(extremeResult.getValue());
+
+    if (!hasCandidateResult
+        || (absExtVal.compareTo(absCandidateResult) > 0
+            || (absExtVal.compareTo(absCandidateResult) == 0
+                && extVal.compareTo(candidateResult) > 0))) {
+      hasCandidateResult = true;
+      extremeResult.setObject(extVal);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index fb49e8c8c1..49ad6a5133 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index fb49e8c8c1..97183aae52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
similarity index 90%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index fb49e8c8c1..c5d03f8442 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
index db266b55e4..bb3f1d880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
@@ -19,15 +19,22 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-public class CountAccumulator implements Accumulator {
+public class MaxValueAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TsPrimitiveType maxResult;
+  private boolean hasCandidateResult;
+
+  public MaxValueAccumulator(TSDataType seriesDataType) {
+    this.maxResult = TsPrimitiveType.getByType(seriesDataType);
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateResult((Comparable<Object>) column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialMaxValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MaxValue should be 1");
     }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    Comparable<Object> maxValue = (Comparable<Object>) statistics.getMaxValue();
+    updateResult(maxValue);
   }
 
   // finalResult should be single column, like: | finalCountValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    maxResult.setObject(finalResult.getObject(0));
   }
 
   // columnBuilder should be single in countAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeObject(maxResult.getValue());
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeObject(maxResult.getValue());
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    hasCandidateResult = false;
+    this.maxResult.reset();
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {maxResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return maxResult.getDataType();
+  }
+
+  private void updateResult(Comparable<Object> minVal) {
+    if (minVal == null) {
+      return;
+    }
+    if (!hasCandidateResult || minVal.compareTo(maxResult.getValue()) > 0) {
+      hasCandidateResult = true;
+      maxResult.setObject(minVal);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index fb49e8c8c1..95bf611acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
index db266b55e4..97f46724ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
@@ -19,15 +19,22 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-public class CountAccumulator implements Accumulator {
+public class MinValueAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TsPrimitiveType minResult;
+  private boolean hasCandidateResult;
+
+  public MinValueAccumulator(TSDataType seriesDataType) {
+    this.minResult = TsPrimitiveType.getByType(seriesDataType);
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateResult((Comparable<Object>) column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialMinValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MinValue should be 1");
     }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+    updateResult(minVal);
   }
 
   // finalResult should be single column, like: | finalCountValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    minResult.setObject(finalResult.getObject(0));
   }
 
-  // columnBuilder should be single in countAccumulator
+  // columnBuilder should be single in MinValueAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeObject(minResult.getValue());
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeObject(minResult.getValue());
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    hasCandidateResult = false;
+    this.minResult.reset();
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {minResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return minResult.getDataType();
+  }
+
+  private void updateResult(Comparable<Object> minVal) {
+    if (minVal == null) {
+      return;
+    }
+    if (!hasCandidateResult || minVal.compareTo(minResult.getValue()) < 0) {
+      hasCandidateResult = true;
+      minResult.setObject(minVal);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
index db266b55e4..7dd3896b1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
@@ -19,15 +19,23 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
-public class CountAccumulator implements Accumulator {
+public class SumAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TSDataType seriesDataType;
+  private double sumValue = 0;
+
+  public SumAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +46,85 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateSumValue(column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialSumValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of Sum should be 1");
     }
+    updateSumValue(partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
   }
 
-  // finalResult should be single column, like: | finalCountValue |
+  // finalResult should be single column, like: | finalSumValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    reset();
+    updateSumValue(finalResult.getObject(0));
   }
 
   // columnBuilder should be single in countAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeDouble(sumValue);
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeDouble(sumValue);
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    this.sumValue = 0;
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+    switch (seriesDataType) {
+      case INT32:
+        sumValue += (int) sumVal;
+        break;
+      case INT64:
+        sumValue = (long) sumVal;
+        break;
+      case FLOAT:
+        sumValue = (float) sumVal;
+        break;
+      case DOUBLE:
+        sumValue = (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 8966b1f57b..b679febcca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -39,9 +40,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This operator is responsible to do the aggregation calculation for one series based on global
@@ -94,9 +96,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             null,
             ascending);
     this.aggregators = aggregators;
-    tsBlockBuilder =
-        new TsBlockBuilder(
-            aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList()));
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
   }
 
@@ -216,10 +220,14 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(curTimeRange.getMin());
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    for (int i = 0; i < aggregators.size(); i++) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[1];
-      columnBuilder[0] = columnBuilders[i];
-      aggregators.get(i).outputResult(columnBuilder);
+    int columnIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
     resultTsBlock = tsBlockBuilder.build();
@@ -261,7 +269,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   @SuppressWarnings("squid:S3776")
   private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (!satisfied(tsBlock, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
       return;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
index 244fbcf7b8..8f3dbff4fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
@@ -59,6 +59,10 @@ public class AggregationDescriptor {
     return aggregationType;
   }
 
+  public AggregationStep getStep() {
+    return step;
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     step.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index bfaced0670..c21314b5d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -30,10 +30,12 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -89,7 +91,14 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithoutTimeFilter() throws IllegalPathException {
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, null);
+            Collections.singletonList(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32),
+                    AggregationStep.SINGLE,
+                    null)),
+            null,
+            true,
+            null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -105,7 +114,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(null, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -126,7 +135,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(null, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -145,8 +154,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter1() throws IllegalPathException {
     Filter timeFilter = TimeFilter.gtEq(120);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -160,8 +168,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter2() throws IllegalPathException {
     Filter timeFilter = TimeFilter.ltEq(379);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -175,8 +182,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter3() throws IllegalPathException {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -197,7 +203,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -217,8 +223,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {100, 100, 100, 100};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -235,11 +240,7 @@ public class SeriesAggregateScanOperatorTest {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT),
-            timeFilter,
-            true,
-            groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -266,7 +267,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -285,8 +286,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -303,8 +303,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -332,7 +331,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 7bfd218374..8251fe9438 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable {
   /**
    * get tsPrimitiveType by resultDataType.
    *
-   * @param dataType -given TsDataType
-   * @param v -
+   * @param dataType given TsDataType
+   */
+  public static TsPrimitiveType getByType(TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return new TsPrimitiveType.TsBoolean();
+      case INT32:
+        return new TsPrimitiveType.TsInt();
+      case INT64:
+        return new TsPrimitiveType.TsLong();
+      case FLOAT:
+        return new TsPrimitiveType.TsFloat();
+      case DOUBLE:
+        return new TsPrimitiveType.TsDouble();
+      case TEXT:
+        return new TsPrimitiveType.TsBinary();
+      case VECTOR:
+        return new TsPrimitiveType.TsVector();
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
+    }
+  }
+
+  /**
+   * get tsPrimitiveType by resultDataType and initial value.
+   *
+   * @param dataType given TsDataType
+   * @param v initial value
    */
   public static TsPrimitiveType getByType(TSDataType dataType, Object v) {
     switch (dataType) {
@@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable {
     throw new UnsupportedOperationException("setVector() is not supported for current sub-class");
   }
 
+  public abstract void setObject(Object val);
+
+  public abstract void reset();
+
   /**
    * get the size of one instance of current class.
    *
@@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private boolean value;
 
+    public TsBoolean() {}
+
     public TsBoolean(boolean value) {
       this.value = value;
     }
@@ -156,6 +188,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Binary) {
+        setBinary((Binary) val);
+      }
+      throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value");
+    }
+
+    @Override
+    public void reset() {
+      value = false;
+    }
+
     @Override
     public int getSize() {
       return 4 + 1;
@@ -198,6 +243,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private int value;
 
+    public TsInt() {}
+
     public TsInt(int value) {
       this.value = value;
     }
@@ -212,6 +259,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Integer) {
+        setInt((Integer) val);
+      }
+      throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -254,6 +314,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private long value;
 
+    public TsLong() {}
+
     public TsLong(long value) {
       this.value = value;
     }
@@ -268,6 +330,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Long) {
+        setLong((Long) val);
+      }
+      throw new UnSupportedDataTypeException("TsLong can only be set Long value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -310,6 +385,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private float value;
 
+    public TsFloat() {}
+
     public TsFloat(float value) {
       this.value = value;
     }
@@ -324,6 +401,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Float) {
+        setFloat((Float) val);
+      }
+      throw new UnSupportedDataTypeException("TsFloat can only be set float value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -366,6 +456,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private double value;
 
+    public TsDouble() {}
+
     public TsDouble(double value) {
       this.value = value;
     }
@@ -380,6 +472,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Double) {
+        setDouble((Double) val);
+      }
+      throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0.0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -422,6 +527,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private Binary value;
 
+    public TsBinary() {}
+
     public TsBinary(Binary value) {
       this.value = value;
     }
@@ -436,6 +543,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Binary) {
+        setBinary((Binary) val);
+      }
+      throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
+    }
+
+    @Override
+    public void reset() {
+      value = null;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4 + value.getLength();
@@ -478,6 +598,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private TsPrimitiveType[] values;
 
+    public TsVector() {}
+
     public TsVector(TsPrimitiveType[] values) {
       this.values = values;
     }
@@ -492,6 +614,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.values = vals;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof TsPrimitiveType[]) {
+        setVector((TsPrimitiveType[]) val);
+      }
+      throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
+    }
+
+    @Override
+    public void reset() {
+      values = null;
+    }
+
     @Override
     public int getSize() {
       int size = 0;