You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/09/29 01:39:15 UTC

[iotdb] branch master updated: [IOTDB-1638] Support vector in aggregation query (#4005)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e54838  [IOTDB-1638] Support vector in aggregation query (#4005)
8e54838 is described below

commit 8e548380c14ce9fa85cd1bb8556c03becfa7199b
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Sep 29 09:38:02 2021 +0800

    [IOTDB-1638] Support vector in aggregation query (#4005)
---
 .../cluster/query/ClusterPhysicalGenerator.java    |   7 +-
 .../iotdb/cluster/query/ClusterQueryRouter.java    |   5 +-
 .../query/aggregate/ClusterAggregateExecutor.java  |  30 +-
 .../query/ClusterAggregateExecutorTest.java        |   8 +-
 .../iotdb/AlignedTimeseriesSessionExample.java     |   7 +
 .../db/engine/cache/TimeSeriesMetadataCache.java   |   6 +-
 .../engine/compaction/utils/CompactionUtils.java   |   7 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  17 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |  33 +-
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   5 -
 .../db/qp/physical/crud/AlignByDevicePlan.java     |   5 +
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |   5 -
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |   5 -
 .../iotdb/db/qp/physical/crud/QueryIndexPlan.java  |   5 -
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  10 -
 .../db/qp/physical/crud/RawDataQueryPlan.java      |  32 +-
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |   5 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   7 +-
 .../db/query/aggregation/AggregateResult.java      |  10 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  17 +-
 .../db/query/aggregation/impl/CountAggrResult.java |  15 +-
 .../query/aggregation/impl/ExtremeAggrResult.java  |  19 +-
 .../aggregation/impl/FirstValueAggrResult.java     |  25 +-
 .../aggregation/impl/FirstValueDescAggrResult.java |  17 +-
 .../aggregation/impl/LastValueAggrResult.java      |  21 +-
 .../aggregation/impl/LastValueDescAggrResult.java  |  17 +-
 .../query/aggregation/impl/MaxTimeAggrResult.java  |  19 +-
 .../aggregation/impl/MaxTimeDescAggrResult.java    |  13 +-
 .../query/aggregation/impl/MaxValueAggrResult.java |  21 +-
 .../query/aggregation/impl/MinTimeAggrResult.java  |  17 +-
 .../aggregation/impl/MinTimeDescAggrResult.java    |  11 +-
 .../query/aggregation/impl/MinValueAggrResult.java |  19 +-
 .../db/query/aggregation/impl/SumAggrResult.java   |  17 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |  15 +-
 .../dataset/groupby/LocalGroupByExecutor.java      |  14 +-
 .../db/query/executor/AggregationExecutor.java     | 377 ++++++++++++++++++---
 .../iotdb/db/query/executor/QueryRouter.java       |  11 +-
 .../db/query/reader/series/IAggregateReader.java   |   2 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  38 +++
 .../reader/series/VectorSeriesAggregateReader.java | 177 ++++++++++
 .../reader/universal/DescPriorityMergeReader.java  |   9 +-
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |  18 -
 .../reader/series/SeriesAggregateReaderTest.java   |  10 +-
 .../session/IoTDBSessionVectorABDeviceIT.java      | 231 +++++++++++++
 .../session/IoTDBSessionVectorAggregationIT.java   | 266 +++++++++++++++
 .../IoTDBSessionVectorAggregationWithUnSeqIT.java  | 192 +++++++++++
 ...ctorIT.java => IoTDBSessionVectorInsertIT.java} |  15 +-
 .../tsfile/file/metadata/VectorChunkMetadata.java  |   4 +
 .../file/metadata/VectorTimeSeriesMetadata.java    |   4 +
 .../apache/iotdb/tsfile/read/common/BatchData.java |  84 ++++-
 .../tsfile/read/common/DescReadWriteBatchData.java |  42 +++
 .../tsfile/read/common/IBatchDataIterator.java     |  35 ++
 .../tsfile/read/reader/BatchDataIterator.java      |  54 ---
 .../tsfile/read/reader/page/VectorPageReader.java  |   6 +-
 54 files changed, 1648 insertions(+), 413 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
index 0dcda7e..80a6a94 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurati
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,6 @@ import java.io.InputStream;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 public class ClusterPhysicalGenerator extends PhysicalGenerator {
@@ -67,9 +65,8 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
   }
 
   @Override
-  public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
-      throws MetadataException {
-    return getCMManager().getSeriesSchemas(paths);
+  public List<PartialPath> groupVectorPaths(List<PartialPath> paths) throws MetadataException {
+    return getCMManager().groupVectorPaths(paths);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index e3be92c..eb45353 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -78,8 +78,9 @@ public class ClusterQueryRouter extends QueryRouter {
   }
 
   @Override
-  protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
-    return new ClusterAggregateExecutor(aggregationPlan, metaGroupMember);
+  protected AggregationExecutor getAggregationExecutor(
+      QueryContext context, AggregationPlan aggregationPlan) {
+    return new ClusterAggregateExecutor(context, aggregationPlan, metaGroupMember);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index a007ee9..af12e5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class ClusterAggregateExecutor extends AggregationExecutor {
@@ -51,8 +50,9 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
    *
    * @param aggregationPlan
    */
-  public ClusterAggregateExecutor(AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
-    super(aggregationPlan);
+  public ClusterAggregateExecutor(
+      QueryContext context, AggregationPlan aggregationPlan, MetaGroupMember metaMember) {
+    super(context, aggregationPlan);
     this.metaMember = metaMember;
     this.readerFactory = new ClusterReaderFactory(metaMember);
     this.aggregator = new ClusterAggregator(metaMember);
@@ -60,24 +60,28 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
 
   @Override
   protected void aggregateOneSeries(
-      Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
-      AggregateResult[] aggregateResultList,
-      Set<String> measurements,
-      Filter timeFilter,
-      QueryContext context)
+      PartialPath seriesPath,
+      List<Integer> indexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
       throws StorageEngineException {
-    PartialPath seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
+    TSDataType tsDataType = dataTypes.get(indexes.get(0));
     List<String> aggregationNames = new ArrayList<>();
 
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregationNames.add(aggregations.get(i));
     }
     List<AggregateResult> aggregateResult =
         aggregator.getAggregateResult(
-            seriesPath, measurements, aggregationNames, tsDataType, timeFilter, context, ascending);
+            seriesPath,
+            allMeasurementsInDevice,
+            aggregationNames,
+            tsDataType,
+            timeFilter,
+            context,
+            ascending);
     int rstIndex = 0;
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregateResultList[i] = aggregateResult.get(rstIndex++);
     }
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 7d18a00..ffa461e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -87,8 +87,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
     try {
-      executor = new ClusterAggregateExecutor(plan, testMetaMember);
-      QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
+      executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithoutValueFilter(plan);
       assertTrue(queryDataSet.hasNext());
       RowRecord record = queryDataSet.next();
       List<Field> fields = record.getFields();
@@ -144,8 +144,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
     try {
-      executor = new ClusterAggregateExecutor(plan, testMetaMember);
-      QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
+      executor = new ClusterAggregateExecutor(context, plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithValueFilter(plan);
       assertTrue(queryDataSet.hasNext());
       RowRecord record = queryDataSet.next();
       List<Field> fields = record.getFields();
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 885ffe9..bbfd4c7 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -144,6 +144,13 @@ public class AlignedTimeseriesSessionExample {
     }
 
     dataSet.closeOperationHandle();
+    dataSet = session.executeQueryStatement("select count(*) from root.sg_1.d1.vector");
+    System.out.println(dataSet.getColumnNames());
+    while (dataSet.hasNext()) {
+      System.out.println(dataSet.next());
+    }
+
+    dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
             "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 0755c37..eebfd66 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -213,11 +213,13 @@ public class TimeSeriesMetadataCache {
           && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
         return Collections.emptyList();
       }
-      return readTimeseriesMetadataForVector(reader, key, subSensorList, allSensors);
+      // for the condition that cache is disabled, we only get what we need
+      Set<String> allSensorSet = new HashSet<>(subSensorList);
+      allSensorSet.add(key.measurement);
+      return readTimeseriesMetadataForVector(reader, key, subSensorList, allSensorSet);
     }
 
     List<TimeseriesMetadata> res = new ArrayList<>();
-
     getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
 
     if (res.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 95d9f5f..23cdb16 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
@@ -110,9 +109,9 @@ public class CompactionUtils {
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         IChunkReader chunkReader = new ChunkReaderByTimestamp(reader.readMemChunk(chunkMetadata));
         while (chunkReader.hasNextSatisfiedPage()) {
-          IPointReader iPointReader = new BatchDataIterator(chunkReader.nextPageData());
-          while (iPointReader.hasNextTimeValuePair()) {
-            TimeValuePair timeValuePair = iPointReader.nextTimeValuePair();
+          IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+          while (batchIterator.hasNextTimeValuePair()) {
+            TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
             timeValuePairMap.put(timeValuePair.getTimestamp(), timeValuePair);
           }
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index dacdda7..73806ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1302,20 +1302,11 @@ public class MManager {
    * @param fullPaths full path list without uniting the sub measurement under the same aligned time
    *     series.
    * @return Size of partial path list could NOT equal to the input list size. For example, the
-   *     VectorMeasurementSchema (s1,s2) would be returned once; Size of integer list must equal to
-   *     the input list size. It indicates the index of elements of original list in the result list
+   *     VectorMeasurementSchema (s1,s2) would be returned once.
    */
-  public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchemas(List<PartialPath> fullPaths)
-      throws MetadataException {
+  public List<PartialPath> groupVectorPaths(List<PartialPath> fullPaths) throws MetadataException {
     Map<IMNode, PartialPath> nodeToPartialPath = new LinkedHashMap<>();
-    Map<String, Integer> pathIndex = new LinkedHashMap<>();
-    for (int i = 0; i < fullPaths.size(); i++) {
-      PartialPath path = fullPaths.get(i);
-      pathIndex.put(path.getExactFullPath(), i);
-      if (path.isMeasurementAliasExists()) {
-        pathIndex.put(path.getFullPathWithAlias(), i);
-      }
-
+    for (PartialPath path : fullPaths) {
       IMeasurementMNode node = getMeasurementMNode(path);
       if (!nodeToPartialPath.containsKey(node)) {
         nodeToPartialPath.put(node, path.copy());
@@ -1329,7 +1320,7 @@ public class MManager {
         }
       }
     }
-    return new Pair<>(new ArrayList<>(nodeToPartialPath.values()), pathIndex);
+    return new ArrayList<>(nodeToPartialPath.values());
   }
 
   // attention: this path must be a device node
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index bf78ca9..3f3282e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
@@ -149,21 +150,8 @@ public class QueryOperator extends Operator {
   }
 
   public void check() throws LogicalOperatorException {
-    if (isAlignByDevice()) {
-      if (selectComponent.hasTimeSeriesGeneratingFunction()) {
-        throw new LogicalOperatorException(
-            "ALIGN BY DEVICE clause is not supported in UDF queries.");
-      }
-
-      for (PartialPath path : selectComponent.getPaths()) {
-        String device = path.getDevice();
-        if (!device.isEmpty()) {
-          throw new LogicalOperatorException(
-              "The paths of the SELECT clause can only be single level. In other words, "
-                  + "the paths of the SELECT clause can only be measurements or STAR, without DOT."
-                  + " For more details please refer to the SQL document.");
-        }
-      }
+    if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
+      throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
     }
   }
 
@@ -227,7 +215,6 @@ public class QueryOperator extends Operator {
     List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
     List<ResultColumn> resultColumns = selectComponent.getResultColumns();
     List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
-
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
     Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
@@ -254,12 +241,14 @@ public class QueryOperator extends Operator {
         try {
           // remove stars in SELECT to get actual paths
           List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+          if (suffixPath.getNodes().length > 1
+              && (actualPaths.isEmpty() || !(actualPaths.get(0) instanceof VectorPartialPath))) {
+            throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+          }
           if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
             throw new QueryProcessException(
-                String.format(
-                    "alias %s can only be matched with one time series", resultColumn.getAlias()));
+                String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
           }
-
           if (actualPaths.isEmpty()) {
             String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
             if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
@@ -373,11 +362,15 @@ public class QueryOperator extends Operator {
         : (((FunctionExpression) expression).getPaths().get(0));
   }
 
+  /**
+   * If path is a vectorPartialPath, we return its measurementId + subMeasurement as the final
+   * measurement. e.g. path: root.sg.d1.vector1[s1], return "vector1.s1".
+   */
   private String getMeasurementName(PartialPath path, String aggregation) {
     String initialMeasurement = path.getMeasurement();
     if (path instanceof VectorPartialPath) {
       String subMeasurement = ((VectorPartialPath) path).getSubSensor(0);
-      initialMeasurement += "." + subMeasurement;
+      initialMeasurement += TsFileConstant.PATH_SEPARATOR + subMeasurement;
     }
     if (aggregation != null) {
       initialMeasurement = aggregation + "(" + initialMeasurement + ")";
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index bccd004..ea74ecb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -122,9 +122,4 @@ public class AggregationPlan extends RawDataQueryPlan {
     }
     return columnForDisplay;
   }
-
-  @Override
-  public boolean isRawQuery() {
-    return false;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index fd1bebd..288b402 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -29,6 +29,11 @@ import java.util.Map;
 
 public class AlignByDevicePlan extends QueryPlan {
 
+  public static final String MEASUREMENT_ERROR_MESSAGE =
+      "The paths of the SELECT clause can only be measurements or STAR.";
+  public static final String ALIAS_ERROR_MESSAGE =
+      "alias %s can only be matched with one time series";
+
   // to record result measurement columns, e.g. temperature, status, speed
   private List<String> measurements;
   private Map<String, MeasurementInfo> measurementInfoMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 300cc85..a952e7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -49,9 +49,4 @@ public class FillQueryPlan extends RawDataQueryPlan {
   public void setFillType(Map<TSDataType, IFill> fillType) {
     this.fillType = fillType;
   }
-
-  @Override
-  public boolean isRawQuery() {
-    return false;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index 21c3424..9f4f684 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -75,9 +75,4 @@ public class LastQueryPlan extends RawDataQueryPlan {
     }
     return false;
   }
-
-  @Override
-  public boolean isRawQuery() {
-    return false;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
index 8a931bf..27d20f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java
@@ -73,9 +73,4 @@ public class QueryIndexPlan extends RawDataQueryPlan {
   public String toString() {
     return String.format("Query paths: %s, index type: %s, props: %s", paths, indexType, props);
   }
-
-  @Override
-  public boolean isRawQuery() {
-    return false;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index aff2b51..52cb0d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -45,8 +45,6 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   private Map<String, Integer> pathToIndex = new HashMap<>();
 
-  private Map<String, Integer> vectorPathToIndex = new HashMap<>();
-
   private boolean enableRedirect = false;
 
   // if true, we don't need the row whose any column is null
@@ -150,14 +148,6 @@ public abstract class QueryPlan extends PhysicalPlan {
     this.enableRedirect = enableRedirect;
   }
 
-  public Map<String, Integer> getVectorPathToIndex() {
-    return vectorPathToIndex;
-  }
-
-  public void setVectorPathToIndex(Map<String, Integer> vectorPathToIndex) {
-    this.vectorPathToIndex = vectorPathToIndex;
-  }
-
   public List<ResultColumn> getResultColumns() {
     return resultColumns;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 5936633..4e59686 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -44,6 +44,7 @@ public class RawDataQueryPlan extends QueryPlan {
   private IExpression expression = null;
   private Map<String, Set<String>> deviceToMeasurements = new HashMap<>();
 
+  // TODO: remove this when all types of query supporting vector
   /** used to group all the sub sensors of one vector into VectorPartialPath */
   private List<PartialPath> deduplicatedVectorPaths = new ArrayList<>();
 
@@ -93,11 +94,9 @@ public class RawDataQueryPlan extends QueryPlan {
       }
     }
 
-    if (isRawQuery()) {
-      // if it is a RawQueryWithoutValueFilter, we also need to group all the subSensors of one
-      // vector into one VectorPartialPath
-      transformVectorPaths(physicalGenerator, columnForDisplaySet);
-    }
+    // if it is a RawQueryWithoutValueFilter, we also need to group all the subSensors of one
+    // vector into one VectorPartialPath
+    groupVectorPaths(physicalGenerator);
   }
 
   public IExpression getExpression() {
@@ -173,25 +172,13 @@ public class RawDataQueryPlan extends QueryPlan {
    * them directly into deduplicatedPaths and deduplicatedDataTypes, because we don't know whether
    * the raw query has value filter here.
    */
-  public void transformVectorPaths(
-      PhysicalGenerator physicalGenerator, Set<String> columnForDisplaySet)
-      throws MetadataException {
-    Pair<List<PartialPath>, Map<String, Integer>> pair =
-        physicalGenerator.getSeriesSchema(getDeduplicatedPaths());
-
-    List<PartialPath> vectorizedDeduplicatedPaths = pair.left;
+  public void groupVectorPaths(PhysicalGenerator physicalGenerator) throws MetadataException {
+    List<PartialPath> vectorizedDeduplicatedPaths =
+        physicalGenerator.groupVectorPaths(getDeduplicatedPaths());
     List<TSDataType> vectorizedDeduplicatedDataTypes =
         new ArrayList<>(physicalGenerator.getSeriesTypes(vectorizedDeduplicatedPaths));
     setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths);
     setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes);
-
-    Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right;
-    Map<String, Integer> vectorPathToIndex = new HashMap<>();
-    for (String columnForDisplay : columnForDisplaySet) {
-      vectorPathToIndex.put(
-          columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay));
-    }
-    setVectorPathToIndex(vectorPathToIndex);
   }
 
   public List<PartialPath> getDeduplicatedVectorPaths() {
@@ -218,11 +205,6 @@ public class RawDataQueryPlan extends QueryPlan {
     if (!this.deduplicatedVectorPaths.isEmpty()) {
       this.deduplicatedPaths = this.deduplicatedVectorPaths;
       this.deduplicatedDataTypes = this.deduplicatedVectorDataTypes;
-      setPathToIndex(getVectorPathToIndex());
     }
   }
-
-  public boolean isRawQuery() {
-    return true;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 57aeff8..502d996 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -129,8 +129,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
     return pathNameToReaderIndex.get(pathName);
   }
 
-  @Override
-  public boolean isRawQuery() {
-    return false;
+  public void setPathNameToReaderIndex(Map<String, Integer> pathNameToReaderIndex) {
+    this.pathNameToReaderIndex = pathNameToReaderIndex;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index abbb074..335de0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -29,10 +29,8 @@ import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurati
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
-import java.util.Map;
 
 /** Used to convert logical operator to physical plan */
 public class PhysicalGenerator {
@@ -60,8 +58,7 @@ public class PhysicalGenerator {
     return SchemaUtils.getSeriesTypesByPaths(paths);
   }
 
-  public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
-      throws MetadataException {
-    return IoTDB.metaManager.getSeriesSchemas(paths);
+  public List<PartialPath> groupVectorPaths(List<PartialPath> paths) throws MetadataException {
+    return IoTDB.metaManager.groupVectorPaths(paths);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 8016be7..a1f000f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -72,20 +72,20 @@ public abstract class AggregateResult {
   /**
    * Aggregate results cannot be calculated using Statistics directly, using the data in each page
    *
-   * @param dataInThisPage the data in Page
+   * @param batchIterator the data in Page
    */
-  public abstract void updateResultFromPageData(BatchData dataInThisPage)
+  public abstract void updateResultFromPageData(IBatchDataIterator batchIterator)
       throws IOException, QueryProcessException;
 
   /**
    * Aggregate results cannot be calculated using Statistics directly, using the data in each page
    *
-   * @param dataInThisPage the data in Page
+   * @param batchIterator the data in Page
    * @param minBound calculate points whose time >= bound
    * @param maxBound calculate points whose time < bound
    */
   public abstract void updateResultFromPageData(
-      BatchData dataInThisPage, long minBound, long maxBound) throws IOException;
+      IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException;
 
   /**
    * This method calculates the aggregation using common timestamps of the cross series filter.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index d808c0b..a1fffa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -82,18 +82,19 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent()) {
-      if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext()) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
       }
-      updateAvg(seriesDataType, dataInThisPage.currentValue());
-      dataInThisPage.next();
+      updateAvg(seriesDataType, batchIterator.currentValue());
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 00bfcad..10b9ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -49,19 +49,20 @@ public class CountAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    setLongValue(getLongValue() + dataInThisPage.length());
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    setLongValue(getLongValue() + batchIterator.totalLength());
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     int cnt = 0;
-    while (dataInThisPage.hasCurrent()) {
-      if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+    while (batchIterator.hasNext()) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
       }
       cnt++;
-      dataInThisPage.next();
+      batchIterator.next();
     }
     setLongValue(getLongValue() + cnt);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index fe235e3..144fe00 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -93,20 +93,21 @@ public class ExtremeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     Comparable<Object> extVal = null;
 
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
 
-      extVal = getExtremeValue(extVal, (Comparable<Object>) dataInThisPage.currentValue());
-      dataInThisPage.next();
+      extVal = getExtremeValue(extVal, (Comparable<Object>) batchIterator.currentValue());
+      batchIterator.next();
     }
     updateResult(extVal);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 56ce8c5..f8a2e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -64,27 +64,28 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
     if (hasFinalResult()) {
       return;
     }
-    if (dataInThisPage.hasCurrent()) {
-      setValue(dataInThisPage.currentValue());
-      timestamp = dataInThisPage.currentTime();
+    if (batchIterator.hasNext()) {
+      setValue(batchIterator.currentValue());
+      timestamp = batchIterator.currentTime();
     }
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     if (hasFinalResult()) {
       return;
     }
-    if (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      setValue(dataInThisPage.currentValue());
-      timestamp = dataInThisPage.currentTime();
-      dataInThisPage.next();
+    if (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      setValue(batchIterator.currentValue());
+      timestamp = batchIterator.currentTime();
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 8eae923..dc20617 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 
@@ -40,13 +40,14 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      setValue(dataInThisPage.currentValue());
-      timestamp = dataInThisPage.currentTime();
-      dataInThisPage.next();
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      setValue(batchIterator.currentValue());
+      timestamp = batchIterator.currentTime();
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index ad06ace..443751c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -60,20 +60,21 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     long time = Long.MIN_VALUE;
     Object lastVal = null;
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      time = dataInThisPage.currentTime();
-      lastVal = dataInThisPage.currentValue();
-      dataInThisPage.next();
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      time = batchIterator.currentTime();
+      lastVal = batchIterator.currentValue();
+      batchIterator.next();
     }
 
     if (time != Long.MIN_VALUE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 587c7a3..981167b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 
@@ -42,18 +42,19 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     if (hasFinalResult()) {
       return;
     }
     long time = Long.MIN_VALUE;
     Object lastVal = null;
-    if (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      time = dataInThisPage.currentTime();
-      lastVal = dataInThisPage.currentValue();
-      dataInThisPage.next();
+    if (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      time = batchIterator.currentTime();
+      lastVal = batchIterator.currentValue();
+      batchIterator.next();
     }
 
     if (time != Long.MIN_VALUE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index fad90de..46ebe15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -49,17 +49,18 @@ public class MaxTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      updateMaxTimeResult(dataInThisPage.currentTime());
-      dataInThisPage.next();
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      updateMaxTimeResult(batchIterator.currentTime());
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index a69dcc9..e867bf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 
@@ -35,14 +35,15 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     if (hasFinalResult()) {
       return;
     }
-    if (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      updateMaxTimeResult(dataInThisPage.currentTime());
+    if (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      updateMaxTimeResult(batchIterator.currentTime());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 34d9622..a61583b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -49,21 +49,22 @@ public class MaxValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     Comparable<Object> maxVal = null;
 
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) < 0) {
-        maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      if (maxVal == null || maxVal.compareTo(batchIterator.currentValue()) < 0) {
+        maxVal = (Comparable<Object>) batchIterator.currentValue();
       }
-      dataInThisPage.next();
+      batchIterator.next();
     }
     updateResult(maxVal);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 37883d5..4d0365f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -52,19 +52,20 @@ public class MinTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
     if (hasFinalResult()) {
       return;
     }
-    if (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      setLongValue(dataInThisPage.currentTime());
+    if (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      setLongValue(batchIterator.currentTime());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 6b45f14..9abceb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 
@@ -33,10 +33,11 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent() && dataInThisPage.currentTime() >= minBound) {
-      setValue(dataInThisPage.currentTime());
-      dataInThisPage.next();
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext() && batchIterator.currentTime() >= minBound) {
+      setValue(batchIterator.currentTime());
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index eefbbb1..f3c01ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -49,17 +49,18 @@ public class MinValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent()
-        && dataInThisPage.currentTime() < maxBound
-        && dataInThisPage.currentTime() >= minBound) {
-      updateResult((Comparable<Object>) dataInThisPage.currentValue());
-      dataInThisPage.next();
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext()
+        && batchIterator.currentTime() < maxBound
+        && batchIterator.currentTime() >= minBound) {
+      updateResult((Comparable<Object>) batchIterator.currentValue());
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 8a11502..2590d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -62,18 +62,19 @@ public class SumAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage) {
-    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  public void updateResultFromPageData(IBatchDataIterator batchIterator) {
+    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   @Override
-  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
-    while (dataInThisPage.hasCurrent()) {
-      if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
+  public void updateResultFromPageData(
+      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+    while (batchIterator.hasNext()) {
+      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
       }
-      updateSum(dataInThisPage.currentValue());
-      dataInThisPage.next();
+      updateSum(batchIterator.currentValue());
+      batchIterator.next();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 86008d1..7ab4095 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -224,6 +225,11 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     return false;
   }
 
+  /**
+   * Get all measurements under given device. For a vectorMeasurementSchema, we return its
+   * measurementId + all subMeasurement. e.g. schema: vector1[s1, s2], return ["vector1.s1",
+   * "vector1.s2"].
+   */
   protected Set<String> getMeasurementsUnderGivenDevice(PartialPath device) throws IOException {
     try {
       Set<String> res = new HashSet<>();
@@ -233,7 +239,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       for (IMeasurementSchema schema : measurementSchemas) {
         if (schema instanceof VectorMeasurementSchema) {
           for (String subMeasurement : schema.getSubMeasurementsList()) {
-            res.add(schema.getMeasurementId() + "." + subMeasurement);
+            res.add(schema.getMeasurementId() + TsFileConstant.PATH_SEPARATOR + subMeasurement);
           }
         } else {
           res.add(schema.getMeasurementId());
@@ -246,6 +252,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
   }
 
+  /**
+   * Attention. For a vectorPath(root.sg.d1.vector1.s1), device is root.sg.d1, measurement is
+   * "vector1.s1".
+   */
   private PartialPath transformPath(PartialPath device, String measurement) throws IOException {
     try {
       PartialPath fullPath = new PartialPath(device.getFullPath(), measurement);
@@ -253,7 +263,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       if (schema instanceof MeasurementSchema) {
         return fullPath;
       } else {
-        return new VectorPartialPath(fullPath.getDevice(), fullPath.getMeasurement());
+        String vectorPath = fullPath.getDevice();
+        return new VectorPartialPath(vectorPath, fullPath.getMeasurement());
       }
     } catch (MetadataException e) {
       throw new IOException("Cannot get node from " + device, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index b8cec3d..ef7704e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -133,19 +134,20 @@ public class LocalGroupByExecutor implements GroupByExecutor {
       }
       // lazy reset batch data for calculation
       batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
+      IBatchDataIterator batchIterator = batchData.getBatchDataIterator();
       if (ascending) {
         // skip points that cannot be calculated
-        while (batchData.hasCurrent() && batchData.currentTime() < curStartTime) {
-          batchData.next();
+        while (batchIterator.hasNext() && batchIterator.currentTime() < curStartTime) {
+          batchIterator.next();
         }
       } else {
-        while (batchData.hasCurrent() && batchData.currentTime() >= curEndTime) {
-          batchData.next();
+        while (batchIterator.hasNext() && batchIterator.currentTime() >= curEndTime) {
+          batchIterator.next();
         }
       }
 
-      if (batchData.hasCurrent()) {
-        result.updateResultFromPageData(batchData, curStartTime, curEndTime);
+      if (batchIterator.hasNext()) {
+        result.updateResultFromPageData(batchIterator, curStartTime, curEndTime);
       }
     }
     lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c01f3c3..0b711f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -40,12 +41,14 @@ import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
@@ -72,26 +75,25 @@ public class AggregationExecutor {
   protected List<String> aggregations;
   protected IExpression expression;
   protected boolean ascending;
+  protected QueryContext context;
+  protected AggregateResult[] aggregateResultList;
 
   /** aggregation batch calculation size. */
   private int aggregateFetchSize;
 
-  protected AggregationExecutor(AggregationPlan aggregationPlan) {
+  protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
     this.expression = aggregationPlan.getExpression();
     this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
     this.ascending = aggregationPlan.isAscending();
+    this.context = context;
+    this.aggregateResultList = new AggregateResult[selectedSeries.size()];
   }
 
-  /**
-   * execute aggregate function with only time filter or no filter.
-   *
-   * @param context query context
-   */
-  public QueryDataSet executeWithoutValueFilter(
-      QueryContext context, AggregationPlan aggregationPlan)
+  /** execute aggregate function with only time filter or no filter. */
+  public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan)
       throws StorageEngineException, IOException, QueryProcessException {
 
     Filter timeFilter = null;
@@ -102,18 +104,29 @@ public class AggregationExecutor {
     // TODO use multi-thread
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
-    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
     // TODO-Cluster: group the paths by storage group to reduce communications
     List<StorageGroupProcessor> list =
         StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+
+    // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
+    Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+        groupVectorSeries(pathToAggrIndexesMap);
     try {
       for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+        PartialPath seriesPath = entry.getKey();
         aggregateOneSeries(
-            entry,
-            aggregateResultList,
-            aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
-            timeFilter,
-            context);
+            seriesPath,
+            entry.getValue(),
+            aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()),
+            timeFilter);
+      }
+      for (Map.Entry<PartialPath, List<List<Integer>>> entry : vectorPathIndexesMap.entrySet()) {
+        VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey();
+        aggregateOneVectorSeries(
+            vectorSeries,
+            entry.getValue(),
+            aggregationPlan.getAllMeasurementsInDevice(vectorSeries.getDevice()),
+            timeFilter);
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
@@ -125,25 +138,20 @@ public class AggregationExecutor {
   /**
    * get aggregation result for one series
    *
-   * @param pathToAggrIndexes entry of path to aggregation indexes map
    * @param timeFilter time filter
-   * @param context query context
    */
   protected void aggregateOneSeries(
-      Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
-      AggregateResult[] aggregateResultList,
-      Set<String> measurements,
-      Filter timeFilter,
-      QueryContext context)
+      PartialPath seriesPath,
+      List<Integer> indexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
       throws IOException, QueryProcessException, StorageEngineException {
     List<AggregateResult> ascAggregateResultList = new ArrayList<>();
     List<AggregateResult> descAggregateResultList = new ArrayList<>();
     boolean[] isAsc = new boolean[aggregateResultList.length];
 
-    PartialPath seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
-
-    for (int i : pathToAggrIndexes.getValue()) {
+    TSDataType tsDataType = dataTypes.get(indexes.get(0));
+    for (int i : indexes) {
       // construct AggregateResult
       AggregateResult aggregateResult =
           AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
@@ -156,7 +164,7 @@ public class AggregationExecutor {
     }
     aggregateOneSeries(
         seriesPath,
-        measurements,
+        allMeasurementsInDevice,
         context,
         timeFilter,
         tsDataType,
@@ -166,7 +174,7 @@ public class AggregationExecutor {
 
     int ascIndex = 0;
     int descIndex = 0;
-    for (int i : pathToAggrIndexes.getValue()) {
+    for (int i : indexes) {
       aggregateResultList[i] =
           isAsc[i]
               ? ascAggregateResultList.get(ascIndex++)
@@ -174,6 +182,58 @@ public class AggregationExecutor {
     }
   }
 
+  protected void aggregateOneVectorSeries(
+      VectorPartialPath seriesPath,
+      List<List<Integer>> subIndexes,
+      Set<String> allMeasurementsInDevice,
+      Filter timeFilter)
+      throws IOException, QueryProcessException, StorageEngineException {
+    List<List<AggregateResult>> ascAggregateResultList = new ArrayList<>();
+    List<List<AggregateResult>> descAggregateResultList = new ArrayList<>();
+    boolean[] isAsc = new boolean[aggregateResultList.length];
+
+    for (List<Integer> subIndex : subIndexes) {
+      TSDataType tsDataType = dataTypes.get(subIndex.get(0));
+      List<AggregateResult> subAscResultList = new ArrayList<>();
+      List<AggregateResult> subDescResultList = new ArrayList<>();
+      for (int i : subIndex) {
+        // construct AggregateResult
+        AggregateResult aggregateResult =
+            AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+        if (aggregateResult.isAscending()) {
+          subAscResultList.add(aggregateResult);
+          isAsc[i] = true;
+        } else {
+          subDescResultList.add(aggregateResult);
+        }
+      }
+      ascAggregateResultList.add(subAscResultList);
+      descAggregateResultList.add(subDescResultList);
+    }
+
+    aggregateOneVectorSeries(
+        seriesPath,
+        allMeasurementsInDevice,
+        context,
+        timeFilter,
+        TSDataType.VECTOR,
+        ascAggregateResultList,
+        descAggregateResultList,
+        null);
+
+    for (int i = 0; i < subIndexes.size(); i++) {
+      List<Integer> subIndex = subIndexes.get(i);
+      List<AggregateResult> subAscResultList = ascAggregateResultList.get(i);
+      List<AggregateResult> subDescResultList = descAggregateResultList.get(i);
+      int ascIndex = 0;
+      int descIndex = 0;
+      for (int index : subIndex) {
+        aggregateResultList[index] =
+            isAsc[index] ? subAscResultList.get(ascIndex++) : subDescResultList.get(descIndex++);
+      }
+    }
+  }
+
   @SuppressWarnings("squid:S107")
   public static void aggregateOneSeries(
       PartialPath seriesPath,
@@ -225,6 +285,65 @@ public class AggregationExecutor {
     }
   }
 
+  public static void aggregateOneVectorSeries(
+      VectorPartialPath seriesPath,
+      Set<String> measurements,
+      QueryContext context,
+      Filter timeFilter,
+      TSDataType tsDataType,
+      List<List<AggregateResult>> ascAggregateResultList,
+      List<List<AggregateResult>> descAggregateResultList,
+      TsFileFilter fileFilter)
+      throws StorageEngineException, IOException, QueryProcessException {
+
+    // construct series reader without value filter
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+    if (fileFilter != null) {
+      QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+    }
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    if (!isAggregateResultEmpty(ascAggregateResultList)) {
+      VectorSeriesAggregateReader seriesReader =
+          new VectorSeriesAggregateReader(
+              seriesPath,
+              measurements,
+              tsDataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              true);
+      aggregateFromVectorReader(seriesReader, ascAggregateResultList);
+    }
+    if (!isAggregateResultEmpty(descAggregateResultList)) {
+      VectorSeriesAggregateReader seriesReader =
+          new VectorSeriesAggregateReader(
+              seriesPath,
+              measurements,
+              tsDataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              false);
+      aggregateFromVectorReader(seriesReader, descAggregateResultList);
+    }
+  }
+
+  private static boolean isAggregateResultEmpty(List<List<AggregateResult>> resultList) {
+    for (List<AggregateResult> result : resultList) {
+      if (!result.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private static void aggregateFromReader(
       IAggregateReader seriesReader, List<AggregateResult> aggregateResultList)
@@ -270,6 +389,69 @@ public class AggregationExecutor {
     }
   }
 
+  private static void aggregateFromVectorReader(
+      VectorSeriesAggregateReader seriesReader, List<List<AggregateResult>> aggregateResultList)
+      throws QueryProcessException, IOException {
+    int remainingToCalculate = 0;
+    List<boolean[]> isCalculatedArray = new ArrayList<>();
+    for (List<AggregateResult> subAggregateResults : aggregateResultList) {
+      remainingToCalculate += subAggregateResults.size();
+      boolean[] subCalculatedArray = new boolean[subAggregateResults.size()];
+      isCalculatedArray.add(subCalculatedArray);
+    }
+
+    while (seriesReader.hasNextFile()) {
+      // cal by file statistics
+      if (seriesReader.canUseCurrentFileStatistics()) {
+        while (seriesReader.hasNextSubSeries()) {
+          Statistics fileStatistics = seriesReader.currentFileStatistics();
+          remainingToCalculate =
+              aggregateStatistics(
+                  aggregateResultList.get(seriesReader.getCurIndex()),
+                  isCalculatedArray.get(seriesReader.getCurIndex()),
+                  remainingToCalculate,
+                  fileStatistics);
+          if (remainingToCalculate == 0) {
+            seriesReader.resetIndex();
+            return;
+          }
+          seriesReader.nextSeries();
+        }
+        seriesReader.skipCurrentFile();
+        continue;
+      }
+
+      while (seriesReader.hasNextChunk()) {
+        // cal by chunk statistics
+        if (seriesReader.canUseCurrentChunkStatistics()) {
+          while (seriesReader.hasNextSubSeries()) {
+            Statistics chunkStatistics = seriesReader.currentChunkStatistics();
+            remainingToCalculate =
+                aggregateStatistics(
+                    aggregateResultList.get(seriesReader.getCurIndex()),
+                    isCalculatedArray.get(seriesReader.getCurIndex()),
+                    remainingToCalculate,
+                    chunkStatistics);
+            if (remainingToCalculate == 0) {
+              seriesReader.resetIndex();
+              return;
+            }
+            seriesReader.nextSeries();
+          }
+          seriesReader.skipCurrentChunk();
+          continue;
+        }
+
+        remainingToCalculate =
+            aggregateVectorPages(
+                seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate);
+        if (remainingToCalculate == 0) {
+          return;
+        }
+      }
+    }
+  }
+
   /** Aggregate each result in the list with the statistics */
   private static int aggregateStatistics(
       List<AggregateResult> aggregateResultList,
@@ -314,31 +496,87 @@ public class AggregationExecutor {
         seriesReader.skipCurrentPage();
         continue;
       }
-      BatchData nextOverlappedPageData = seriesReader.nextPage();
-      for (int i = 0; i < aggregateResultList.size(); i++) {
-        if (!isCalculatedArray[i]) {
-          AggregateResult aggregateResult = aggregateResultList.get(i);
-          aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-          nextOverlappedPageData.resetBatchData();
-          if (aggregateResult.hasFinalResult()) {
-            isCalculatedArray[i] = true;
-            remainingToCalculate--;
-            if (remainingToCalculate == 0) {
-              return 0;
-            }
+      IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+      remainingToCalculate =
+          aggregateBatchData(
+              aggregateResultList, isCalculatedArray, remainingToCalculate, batchDataIterator);
+    }
+    return remainingToCalculate;
+  }
+
+  private static int aggregateVectorPages(
+      VectorSeriesAggregateReader seriesReader,
+      List<List<AggregateResult>> aggregateResultList,
+      List<boolean[]> isCalculatedArray,
+      int remainingToCalculate)
+      throws IOException, QueryProcessException {
+    while (seriesReader.hasNextPage()) {
+      // cal by page statistics
+      if (seriesReader.canUseCurrentPageStatistics()) {
+        while (seriesReader.hasNextSubSeries()) {
+          Statistics pageStatistic = seriesReader.currentPageStatistics();
+          remainingToCalculate =
+              aggregateStatistics(
+                  aggregateResultList.get(seriesReader.getCurIndex()),
+                  isCalculatedArray.get(seriesReader.getCurIndex()),
+                  remainingToCalculate,
+                  pageStatistic);
+          if (remainingToCalculate == 0) {
+            seriesReader.resetIndex();
+            return 0;
           }
+          seriesReader.nextSeries();
         }
+        seriesReader.skipCurrentPage();
+        continue;
+      }
+
+      BatchData nextOverlappedPageData = seriesReader.nextPage();
+      while (seriesReader.hasNextSubSeries()) {
+        int subIndex = seriesReader.getCurIndex();
+        IBatchDataIterator batchIterator = nextOverlappedPageData.getBatchDataIterator(subIndex);
+        remainingToCalculate =
+            aggregateBatchData(
+                aggregateResultList.get(subIndex),
+                isCalculatedArray.get(subIndex),
+                remainingToCalculate,
+                batchIterator);
+        if (remainingToCalculate == 0) {
+          seriesReader.resetIndex();
+          return 0;
+        }
+        seriesReader.nextSeries();
       }
     }
     return remainingToCalculate;
   }
 
-  /**
-   * execute aggregate function with value filter.
-   *
-   * @param context query context.
-   */
-  public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
+  private static int aggregateBatchData(
+      List<AggregateResult> aggregateResultList,
+      boolean[] isCalculatedArray,
+      int remainingToCalculate,
+      IBatchDataIterator batchIterator)
+      throws QueryProcessException, IOException {
+    int newRemainingToCalculate = remainingToCalculate;
+    for (int i = 0; i < aggregateResultList.size(); i++) {
+      if (!isCalculatedArray[i]) {
+        AggregateResult aggregateResult = aggregateResultList.get(i);
+        aggregateResult.updateResultFromPageData(batchIterator);
+        batchIterator.reset();
+        if (aggregateResult.hasFinalResult()) {
+          isCalculatedArray[i] = true;
+          remainingToCalculate--;
+          if (remainingToCalculate == 0) {
+            return newRemainingToCalculate;
+          }
+        }
+      }
+    }
+    return newRemainingToCalculate;
+  }
+
+  /** execute aggregate function with value filter. */
+  public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
     optimizeLastElementFunc(queryPlan);
 
@@ -362,15 +600,13 @@ public class AggregationExecutor {
       StorageEngine.getInstance().mergeUnLock(list);
     }
 
-    List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
-      AggregateResult result =
+      aggregateResultList[i] =
           AggregateResultFactory.getAggrResultByName(
               aggregations.get(i), dataTypes.get(i), ascending);
-      aggregateResults.add(result);
     }
-    aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
-    return constructDataSet(aggregateResults, queryPlan);
+    aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap);
+    return constructDataSet(Arrays.asList(aggregateResultList), queryPlan);
   }
 
   private void optimizeLastElementFunc(QueryPlan queryPlan) {
@@ -408,7 +644,6 @@ public class AggregationExecutor {
 
   /** calculate aggregation result with value filter. */
   private void aggregateWithValueFilter(
-      List<AggregateResult> aggregateResults,
       TimeGenerator timestampGenerator,
       Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
       throws IOException {
@@ -435,17 +670,16 @@ public class AggregationExecutor {
         if (cached.get(pathId)) {
           Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
           for (Integer i : entry.getValue()) {
-            aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+            aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
           }
         } else {
           if (entry.getValue().size() == 1) {
-            aggregateResults
-                .get(entry.getValue().get(0))
-                .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+            aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
+                timeArray, timeArrayLength, entry.getKey());
           } else {
             Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
             for (Integer i : entry.getValue()) {
-              aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+              aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
             }
           }
         }
@@ -511,4 +745,35 @@ public class AggregationExecutor {
     }
     return pathToAggrIndexesMap;
   }
+
+  /**
+   * Group all the subSensors of one vector into one VectorPartialPath and Remove vectorPartialPath
+   * from pathToAggrIndexesMap. For example, input map: vector1[s1] -> [1, 3], vector1[s2] -> [2,4],
+   * will return vector1[s1,s2], [[1,3], [2,4]]
+   */
+  private Map<PartialPath, List<List<Integer>>> groupVectorSeries(
+      Map<PartialPath, List<Integer>> pathToAggrIndexesMap) {
+    Map<PartialPath, List<List<Integer>>> result = new HashMap<>();
+    Map<String, VectorPartialPath> temp = new HashMap<>();
+
+    List<PartialPath> seriesPaths = new ArrayList<>(pathToAggrIndexesMap.keySet());
+    for (PartialPath seriesPath : seriesPaths) {
+      if (seriesPath instanceof VectorPartialPath) {
+        List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
+        VectorPartialPath groupPath = temp.get(seriesPath.getFullPath());
+        if (groupPath == null) {
+          groupPath = (VectorPartialPath) seriesPath.copy();
+          temp.put(seriesPath.getFullPath(), groupPath);
+          result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
+        } else {
+          // groupPath is changed here so we update it
+          List<List<Integer>> subIndexes = result.remove(groupPath);
+          subIndexes.add(indexes);
+          groupPath.addSubSensor(((VectorPartialPath) seriesPath).getSubSensorsList());
+          result.put(groupPath, subIndexes);
+        }
+      }
+    }
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d2ae91f..a134d8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -139,22 +139,23 @@ public class QueryRouter implements IQueryRouter {
 
     aggregationPlan.setExpression(optimizedExpression);
 
-    AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan);
+    AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan);
 
     QueryDataSet dataSet = null;
 
     if (optimizedExpression != null
         && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
-      dataSet = engineExecutor.executeWithValueFilter(context, aggregationPlan);
+      dataSet = engineExecutor.executeWithValueFilter(aggregationPlan);
     } else {
-      dataSet = engineExecutor.executeWithoutValueFilter(context, aggregationPlan);
+      dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan);
     }
 
     return dataSet;
   }
 
-  protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) {
-    return new AggregationExecutor(aggregationPlan);
+  protected AggregationExecutor getAggregationExecutor(
+      QueryContext context, AggregationPlan aggregationPlan) {
+    return new AggregationExecutor(context, aggregationPlan);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index 867ad9d..c48b88e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -37,7 +37,7 @@ public interface IAggregateReader {
 
   boolean canUseCurrentChunkStatistics() throws IOException;
 
-  Statistics currentChunkStatistics();
+  Statistics currentChunkStatistics() throws IOException;
 
   void skipCurrentChunk();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index ebd3db0..b6f2d3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -43,6 +45,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -273,6 +276,13 @@ public class SeriesReader {
     return firstTimeSeriesMetadata.getStatistics();
   }
 
+  Statistics currentFileStatistics(int index) throws IOException {
+    if (!(firstTimeSeriesMetadata instanceof VectorTimeSeriesMetadata)) {
+      throw new IOException("Can only get statistics by index from vectorTimeSeriesMetaData");
+    }
+    return ((VectorTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
+  }
+
   boolean currentFileModified() throws IOException {
     if (firstTimeSeriesMetadata == null) {
       throw new IOException("no first file");
@@ -394,6 +404,13 @@ public class SeriesReader {
     return firstChunkMetadata.getStatistics();
   }
 
+  Statistics currentChunkStatistics(int index) throws IOException {
+    if (!(firstChunkMetadata instanceof VectorChunkMetadata)) {
+      throw new IOException("Can only get statistics by index from vectorChunkMetaData");
+    }
+    return ((VectorChunkMetadata) firstChunkMetadata).getStatistics(index);
+  }
+
   boolean currentChunkModified() throws IOException {
     if (firstChunkMetadata == null) {
       throw new IOException("no first chunk");
@@ -615,6 +632,16 @@ public class SeriesReader {
     return firstPageReader.getStatistics();
   }
 
+  Statistics currentPageStatistics(int index) throws IOException {
+    if (firstPageReader == null) {
+      return null;
+    }
+    if (!(firstPageReader.isVectorPageReader())) {
+      throw new IOException("Can only get statistics by index from VectorPageReader");
+    }
+    return firstPageReader.getStatistics(index);
+  }
+
   boolean currentPageModified() throws IOException {
     if (firstPageReader == null) {
       throw new IOException("no first page");
@@ -1023,10 +1050,21 @@ public class SeriesReader {
       this.isSeq = isSeq;
     }
 
+    public boolean isVectorPageReader() {
+      return data instanceof VectorPageReader;
+    }
+
     Statistics getStatistics() {
       return data.getStatistics();
     }
 
+    Statistics getStatistics(int index) throws IOException {
+      if (!(data instanceof VectorPageReader)) {
+        throw new IOException("Can only get statistics by index from VectorPageReader");
+      }
+      return ((VectorPageReader) data).getStatistics(index);
+    }
+
     BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
       return data.getAllSatisfiedPageData(ascending);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
new file mode 100644
index 0000000..23b1243
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class VectorSeriesAggregateReader implements IAggregateReader {
+
+  private final SeriesReader seriesReader;
+  /**
+   * Used to locate the subSensor that we are traversing now. Use hasNextSubSeries() method to check
+   * if we have more sub series in one loop. And use nextSeries() method to move to next sub series.
+   */
+  private int curIndex = 0;
+
+  private final int subSensorSize;
+
+  public VectorSeriesAggregateReader(
+      VectorPartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    this.seriesReader =
+        new SeriesReader(
+            seriesPath,
+            allSensors,
+            dataType,
+            context,
+            dataSource,
+            timeFilter,
+            valueFilter,
+            fileFilter,
+            ascending);
+    this.subSensorSize = seriesPath.getSubSensorsList().size();
+  }
+
+  @Override
+  public boolean isAscending() {
+    return seriesReader.getOrderUtils().getAscending();
+  }
+
+  @Override
+  public boolean hasNextFile() throws IOException {
+    return seriesReader.hasNextFile();
+  }
+
+  @Override
+  public boolean canUseCurrentFileStatistics() throws IOException {
+    Statistics fileStatistics = currentFileStatistics();
+    return !seriesReader.isFileOverlapped()
+        && containedByTimeFilter(fileStatistics)
+        && !seriesReader.currentFileModified();
+  }
+
+  @Override
+  public Statistics currentFileStatistics() throws IOException {
+    return seriesReader.currentFileStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentFile() {
+    seriesReader.skipCurrentFile();
+  }
+
+  @Override
+  public boolean hasNextChunk() throws IOException {
+    return seriesReader.hasNextChunk();
+  }
+
+  @Override
+  public boolean canUseCurrentChunkStatistics() throws IOException {
+    Statistics chunkStatistics = currentChunkStatistics();
+    return !seriesReader.isChunkOverlapped()
+        && containedByTimeFilter(chunkStatistics)
+        && !seriesReader.currentChunkModified();
+  }
+
+  @Override
+  public Statistics currentChunkStatistics() throws IOException {
+    return seriesReader.currentChunkStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentChunk() {
+    seriesReader.skipCurrentChunk();
+  }
+
+  @Override
+  public boolean hasNextPage() throws IOException {
+    return seriesReader.hasNextPage();
+  }
+
+  @Override
+  public boolean canUseCurrentPageStatistics() throws IOException {
+    Statistics currentPageStatistics = currentPageStatistics();
+    if (currentPageStatistics == null) {
+      return false;
+    }
+    return !seriesReader.isPageOverlapped()
+        && containedByTimeFilter(currentPageStatistics)
+        && !seriesReader.currentPageModified();
+  }
+
+  @Override
+  public Statistics currentPageStatistics() throws IOException {
+    return seriesReader.currentPageStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentPage() {
+    seriesReader.skipCurrentPage();
+  }
+
+  @Override
+  public BatchData nextPage() throws IOException {
+    return seriesReader.nextPage().flip();
+  }
+
+  private boolean containedByTimeFilter(Statistics statistics) {
+    Filter timeFilter = seriesReader.getTimeFilter();
+    return timeFilter == null
+        || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
+  }
+
+  public boolean hasNextSubSeries() {
+    if (getCurIndex() < subSensorSize) {
+      return true;
+    } else {
+      resetIndex();
+      return false;
+    }
+  }
+
+  public void nextSeries() {
+    curIndex++;
+  }
+
+  public int getCurIndex() {
+    return curIndex;
+  }
+
+  public void resetIndex() {
+    curIndex = 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
index 06fd982..08ae2c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
@@ -37,16 +37,9 @@ public class DescPriorityMergeReader extends PriorityMergeReader {
             });
   }
 
-  /**
-   * @param reader
-   * @param priority
-   * @param endTime
-   * @param queryContext
-   * @throws IOException
-   */
   @Override
   public void addReader(
-      IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext queryContext)
+      IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context)
       throws IOException {
     if (reader.hasNextTimeValuePair()) {
       heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 2333032..f144216 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -667,24 +667,6 @@ public class IoTDBAlignByDeviceIT {
   }
 
   @Test
-  public void errorCaseTest1() throws ClassNotFoundException {
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      statement.execute("select d0.s1, d0.s2, d1.s0 from root.vehicle align by device");
-      fail("No exception thrown.");
-    } catch (Exception e) {
-      Assert.assertTrue(
-          e.getMessage()
-              .contains(
-                  "The paths of the SELECT clause can only be single level. In other words, "
-                      + "the paths of the SELECT clause can only be measurements or STAR, without DOT."));
-    }
-  }
-
-  @Test
   public void errorCaseTest3() throws ClassNotFoundException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     try (Connection connection =
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index a77d592..2bd0a08 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -120,10 +120,10 @@ public class SeriesAggregateReaderTest {
             }
 
             while (seriesReader.hasNextPage()) {
-              BatchData nextOverlappedPageData = seriesReader.nextPage();
-              aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-              nextOverlappedPageData.resetBatchData();
-              assertTrue(nextOverlappedPageData.hasCurrent());
+              IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator();
+              aggregateResult.updateResultFromPageData(batchDataIterator);
+              batchDataIterator.reset();
+              assertTrue(batchDataIterator.hasNext());
             }
             loopTime++;
           }
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
new file mode 100644
index 0000000..963b17a2
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorABDeviceIT {
+  private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+  private static final String ROOT_SG1_D1 = "root.sg1.d1";
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+  private static Session session;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    EnvironmentUtils.envSetUp();
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    createAlignedTimeseries();
+    prepareAlignedTimeSeriesData();
+    prepareNonAlignedTimeSeriesData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+    CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+  }
+
+  @Test
+  public void subMeasurementAlignByDeviceTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select vector1.s1, vector1.s2 from root.sg1.d1 limit 1 align by device");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("Time", dataSet.getColumnNames().get(0));
+      assertEquals("Device", dataSet.getColumnNames().get(1));
+      assertEquals("vector1.s1", dataSet.getColumnNames().get(2));
+      assertEquals("vector1.s2", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(1, rowRecord.getTimestamp());
+        assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+        assertEquals(2, rowRecord.getFields().get(1).getLongV());
+        assertEquals(3, rowRecord.getFields().get(2).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAlignByDeviceTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select vector1.* from root.sg1.d1 limit 1 align by device");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("Time", dataSet.getColumnNames().get(0));
+      assertEquals("Device", dataSet.getColumnNames().get(1));
+      assertEquals("vector1.s1", dataSet.getColumnNames().get(2));
+      assertEquals("vector1.s2", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(1, rowRecord.getTimestamp());
+        assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+        assertEquals(2, rowRecord.getFields().get(1).getLongV());
+        assertEquals(3, rowRecord.getFields().get(2).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  /** Ignore until the tablet interface. */
+  public void vectorAlignByDeviceWithWildcardTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select ** from root.sg1.d1 limit 1 align by device");
+      assertEquals(7, dataSet.getColumnNames().size());
+      assertEquals("Time", dataSet.getColumnNames().get(0));
+      assertEquals("Device", dataSet.getColumnNames().get(1));
+      assertEquals("s3", dataSet.getColumnNames().get(2));
+      assertEquals("s4", dataSet.getColumnNames().get(3));
+      assertEquals("s5", dataSet.getColumnNames().get(4));
+      assertEquals("vector1.s1", dataSet.getColumnNames().get(5));
+      assertEquals("vector1.s2", dataSet.getColumnNames().get(6));
+
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(1, rowRecord.getTimestamp());
+        assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+        assertEquals(2, rowRecord.getFields().get(4).getLongV());
+        assertEquals(3, rowRecord.getFields().get(5).getLongV());
+        assertEquals(4, rowRecord.getFields().get(1).getLongV());
+        assertEquals(5, rowRecord.getFields().get(2).getLongV());
+        assertEquals(6, rowRecord.getFields().get(3).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationAlignByDeviceTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select count(vector1.*) from root.sg1.d1 align by device");
+      assertEquals(3, dataSet.getColumnNames().size());
+      assertEquals("Device", dataSet.getColumnNames().get(0));
+      assertEquals("count(vector1.s1)", dataSet.getColumnNames().get(1));
+      assertEquals("count(vector1.s2)", dataSet.getColumnNames().get(2));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals("root.sg1.d1", rowRecord.getFields().get(0).getBinaryV().toString());
+        assertEquals(100, rowRecord.getFields().get(1).getLongV());
+        assertEquals(100, rowRecord.getFields().get(2).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void createAlignedTimeseries()
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> measurements = new ArrayList<>();
+    for (int i = 1; i <= 2; i++) {
+      measurements.add("s" + i);
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.INT64);
+    dataTypes.add(TSDataType.INT64);
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (int i = 1; i <= 2; i++) {
+      encodings.add(TSEncoding.RLE);
+    }
+    session.createAlignedTimeseries(
+        ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+  }
+
+  private static void prepareAlignedTimeSeriesData()
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
+    for (long time = 1; time <= 100; time++) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 1);
+      values.add(time + 2);
+      session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+    }
+  }
+
+  private static void prepareNonAlignedTimeSeriesData()
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    measurements.add("s3");
+    measurements.add("s4");
+    measurements.add("s5");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
+    for (long time = 1; time <= 100; time++) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 3L);
+      values.add(time + 4L);
+      values.add(time + 5L);
+      session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
+    }
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
new file mode 100644
index 0000000..597061b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorAggregationIT {
+
+  private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+  private static final String ROOT_SG1_D1 = "root.sg1.d1";
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+  private static Session session;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    EnvironmentUtils.envSetUp();
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    prepareAlignedTimeseriesData();
+    prepareNonAlignedTimeSeriesData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+    CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+  }
+
+  @Test
+  public void vectorAggregationCountTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select count(s1), count(s2) from root.sg1.d1.vector1");
+      assertEquals(2, dataSet.getColumnNames().size());
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(100, rowRecord.getFields().get(0).getLongV());
+        assertEquals(100, rowRecord.getFields().get(1).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationSumAvgTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select sum(s1), avg(s2) from root.sg1.d1.vector1");
+      assertEquals(2, dataSet.getColumnNames().size());
+      assertEquals("sum(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("avg(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(5150, rowRecord.getFields().get(0).getDoubleV(), 0.01);
+        assertEquals(52.5, rowRecord.getFields().get(1).getDoubleV(), 0.01);
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationMinMaxTimeTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select min_time(s1), min_time(s2), max_time(s1), max_time(s2) from root.sg1.d1.vector1");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(2));
+      assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(1, rowRecord.getFields().get(0).getLongV());
+        assertEquals(1, rowRecord.getFields().get(1).getLongV());
+        assertEquals(100, rowRecord.getFields().get(2).getLongV());
+        assertEquals(100, rowRecord.getFields().get(3).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationMinMaxValueTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select min_value(s1), max_value(s2) from root.sg1.d1.vector1");
+      assertEquals(2, dataSet.getColumnNames().size());
+      assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(2, rowRecord.getFields().get(0).getLongV());
+        assertEquals(102, rowRecord.getFields().get(1).getIntV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationFirstLastValueTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select first_value(s1), last_value(s2) from root.sg1.d1.vector1");
+      assertEquals(2, dataSet.getColumnNames().size());
+      assertEquals("first_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("last_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(2, rowRecord.getFields().get(0).getLongV());
+        assertEquals(102, rowRecord.getFields().get(1).getIntV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /** Test query vector time series and non aligned time series togther. */
+  @Test
+  public void vectorComplexTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select count(vector1.s1), max_value(s3), count(vector1.s2), min_time(s4) from root.sg1.d1");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("max_value(" + ROOT_SG1_D1 + ".s3)", dataSet.getColumnNames().get(1));
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(2));
+      assertEquals("min_time(" + ROOT_SG1_D1 + ".s4)", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(100, rowRecord.getFields().get(0).getLongV());
+        assertEquals(103, rowRecord.getFields().get(1).getLongV());
+        assertEquals(100, rowRecord.getFields().get(2).getLongV());
+        assertEquals(1, rowRecord.getFields().get(3).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /** Method 1 for insert tablet with aligned timeseries */
+  private static void prepareAlignedTimeseriesData()
+      throws IoTDBConnectionException, StatementExecutionException {
+    // The schema of measurements of one device
+    // only measurementId and data type in MeasurementSchema take effects in Tablet
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(
+        new VectorMeasurementSchema(
+            "vector1",
+            new String[] {"s1", "s2"},
+            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+
+    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
+    tablet.setAligned(true);
+
+    for (long row = 1; row <= 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, row);
+      tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(0), rowIndex, row + 1);
+      tablet.addValue(schemaList.get(0).getSubMeasurementsList().get(1), rowIndex, (int) (row + 2));
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        session.insertTablet(tablet, true);
+        tablet.reset();
+      }
+    }
+
+    if (tablet.rowSize != 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+    session.executeNonQueryStatement("flush");
+  }
+
+  private static void prepareNonAlignedTimeSeriesData()
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    measurements.add("s3");
+    measurements.add("s4");
+    measurements.add("s5");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
+    for (long time = 1; time <= 100; time++) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 3L);
+      values.add(time + 4L);
+      values.add(time + 5L);
+      session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
+    }
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
new file mode 100644
index 0000000..c2fac60
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationWithUnSeqIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBSessionVectorAggregationWithUnSeqIT {
+
+  private static final String ROOT_SG1_D1_VECTOR1 = "root.sg1.d1.vector1";
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+  private static Session session;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    EnvironmentUtils.envSetUp();
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    createAlignedTimeseries();
+    prepareAlignedTimeseriesDataWithUnSeq();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+    CONFIG.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+  }
+
+  @Test
+  public void vectorAggregationCountTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select count(s1), count(s2) from root.sg1.d1.vector1");
+      assertEquals(2, dataSet.getColumnNames().size());
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("count(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(200, rowRecord.getFields().get(0).getLongV());
+        assertEquals(200, rowRecord.getFields().get(1).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationMinMaxTimeTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select min_time(s1), max_time(s1), min_time(s2), max_time(s2) from root.sg1.d1.vector1");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(1));
+      assertEquals("min_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(2));
+      assertEquals("max_time(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(1, rowRecord.getFields().get(0).getLongV());
+        assertEquals(200, rowRecord.getFields().get(1).getLongV());
+        assertEquals(1, rowRecord.getFields().get(2).getLongV());
+        assertEquals(200, rowRecord.getFields().get(3).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void vectorAggregationMinMaxValueTest() {
+    try {
+      SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select min_value(s1), min_value(s2), max_value(s1), max_value(s2) from root.sg1.d1.vector1");
+      assertEquals(4, dataSet.getColumnNames().size());
+      assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(0));
+      assertEquals("min_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(1));
+      assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s1)", dataSet.getColumnNames().get(2));
+      assertEquals("max_value(" + ROOT_SG1_D1_VECTOR1 + ".s2)", dataSet.getColumnNames().get(3));
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        assertEquals(2, rowRecord.getFields().get(0).getLongV());
+        assertEquals(3, rowRecord.getFields().get(1).getLongV());
+        assertEquals(201, rowRecord.getFields().get(2).getLongV());
+        assertEquals(202, rowRecord.getFields().get(3).getLongV());
+        dataSet.next();
+      }
+
+      dataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void createAlignedTimeseries()
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> measurements = new ArrayList<>();
+    for (int i = 1; i <= 2; i++) {
+      measurements.add("s" + i);
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.INT64);
+    dataTypes.add(TSDataType.INT64);
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (int i = 1; i <= 2; i++) {
+      encodings.add(TSEncoding.RLE);
+    }
+    session.createAlignedTimeseries(
+        ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+  }
+
+  private static void prepareAlignedTimeseriesDataWithUnSeq()
+      throws StatementExecutionException, IoTDBConnectionException {
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    types.add(TSDataType.INT64);
+    types.add(TSDataType.INT64);
+
+    for (long time = 1; time <= 200; time++) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 1);
+      values.add(time + 2);
+      session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+    }
+    session.executeNonQueryStatement("flush");
+
+    for (long time = 2; time <= 50; time += 2) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 1);
+      values.add(time + 2);
+      session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+    }
+    session.executeNonQueryStatement("flush");
+
+    for (long time = 150; time <= 200; time += 10) {
+      List<Object> values = new ArrayList<>();
+      values.add(time + 1);
+      values.add(time + 2);
+      session.insertAlignedRecord(ROOT_SG1_D1_VECTOR1, time, measurements, types, values);
+    }
+    session.executeNonQueryStatement("flush");
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
similarity index 98%
rename from session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java
rename to session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index 58cc28d..877ebb5 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.session;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -40,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /** use session interface to IT for vector timeseries insert and select Black-box Testing */
-public class IoTDBSessionVectorIT {
+public class IoTDBSessionVectorInsertIT {
   private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
   private static final String ROOT_SG1_D1 = "root.sg_1.d1";
   private static final String ROOT_SG1_D2 = "root.sg_1.d2";
@@ -64,7 +65,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedTabletTest() {
+  public void testInsertAlignedTablet() {
     try {
       insertTabletWithAlignedTimeseriesMethod();
       session.executeNonQueryStatement("flush");
@@ -89,7 +90,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedSingleSelectTest() {
+  public void testInsertAlignedRecord() {
     try {
       insertAlignedRecord(ROOT_SG1_D1_VECTOR1);
       session.executeNonQueryStatement("flush");
@@ -111,7 +112,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedStringSingleSelectTest() {
+  public void testInsertAlignedStringRecord() {
     try {
       insertAlignedStringRecord(ROOT_SG1_D1_VECTOR1);
       session.executeNonQueryStatement("flush");
@@ -133,7 +134,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedStringRecordsSingleSelectTest() {
+  public void testInsertAlignedStringRecords() {
     try {
       insertAlignedStringRecords(ROOT_SG1_D1_VECTOR1);
       session.executeNonQueryStatement("flush");
@@ -155,7 +156,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedRecordsSingleSelectTest() {
+  public void testInsertAlignedRecords() {
     try {
       insertAlignedRecords(ROOT_SG1_D1_VECTOR1);
       session.executeNonQueryStatement("flush");
@@ -177,7 +178,7 @@ public class IoTDBSessionVectorIT {
   }
 
   @Test
-  public void alignedRecordsOfOneDeviceSingleSelectTest() {
+  public void testInsertAlignedRecordsOfOneDevice() {
     try {
       insertAlignedRecordsOfOneDevice(ROOT_SG1_D1_VECTOR1);
       session.executeNonQueryStatement("flush");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 6558da0..2bfda53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -49,6 +49,10 @@ public class VectorChunkMetadata implements IChunkMetadata {
         : timeChunkMetadata.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valueChunkMetadataList.get(index).getStatistics();
+  }
+
   @Override
   public boolean isModified() {
     return timeChunkMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index 87e88af..c3d727f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -49,6 +49,10 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
         : timeseriesMetadata.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valueTimeseriesMetadataList.get(index).getStatistics();
+  }
+
   @Override
   public boolean isModified() {
     return timeseriesMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 1005f34..4cb8f17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsBinary;
@@ -170,6 +170,10 @@ public class BatchData {
     return dataType;
   }
 
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
+
   public BatchDataType getBatchDataType() {
     return batchDataType;
   }
@@ -633,7 +637,12 @@ public class BatchData {
   }
 
   public BatchDataIterator getBatchDataIterator() {
-    return new BatchDataIterator(this);
+    return new BatchDataIterator();
+  }
+
+  /** Only used for the batch data of vector time series. */
+  public IBatchDataIterator getBatchDataIterator(int subIndex) {
+    return new VectorBatchDataIterator(subIndex);
   }
 
   /**
@@ -692,4 +701,75 @@ public class BatchData {
       }
     }
   }
+
+  private class BatchDataIterator implements IPointReader, IBatchDataIterator {
+
+    @Override
+    public boolean hasNext() {
+      return BatchData.this.hasCurrent();
+    }
+
+    @Override
+    public void next() {
+      BatchData.this.next();
+    }
+
+    @Override
+    public long currentTime() {
+      return BatchData.this.currentTime();
+    }
+
+    @Override
+    public Object currentValue() {
+      return BatchData.this.currentValue();
+    }
+
+    @Override
+    public void reset() {
+      BatchData.this.resetBatchData();
+    }
+
+    @Override
+    public int totalLength() {
+      return BatchData.this.length();
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      return hasNext();
+    }
+
+    @Override
+    public TimeValuePair nextTimeValuePair() {
+      TimeValuePair timeValuePair = new TimeValuePair(currentTime(), currentTsPrimitiveType());
+      next();
+      return timeValuePair;
+    }
+
+    @Override
+    public TimeValuePair currentTimeValuePair() {
+      return new TimeValuePair(currentTime(), currentTsPrimitiveType());
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  private class VectorBatchDataIterator extends BatchDataIterator {
+
+    private final int subIndex;
+
+    private VectorBatchDataIterator(int subIndex) {
+      this.subIndex = subIndex;
+    }
+
+    @Override
+    public Object currentValue() {
+      if (dataType == TSDataType.VECTOR) {
+        return getVector()[subIndex].getValue();
+      } else {
+        return null;
+      }
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
index ed6e6c2..ca14066 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.read.common;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.util.LinkedList;
 
@@ -70,6 +71,10 @@ public class DescReadWriteBatchData extends DescReadBatchData {
         binaryRet = new LinkedList<>();
         binaryRet.add(new Binary[capacity]);
         break;
+      case VECTOR:
+        vectorRet = new LinkedList<>();
+        vectorRet.add(new TsPrimitiveType[capacity][]);
+        break;
       default:
         throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
@@ -297,6 +302,43 @@ public class DescReadWriteBatchData extends DescReadBatchData {
     count++;
   }
 
+  /**
+   * put vector data.
+   *
+   * @param t timestamp
+   * @param v vector data.
+   */
+  @Override
+  public void putVector(long t, TsPrimitiveType[] v) {
+    if (writeCurArrayIndex == -1) {
+      if (capacity >= CAPACITY_THRESHOLD) {
+        ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]);
+        ((LinkedList<TsPrimitiveType[][]>) vectorRet).addFirst(new TsPrimitiveType[capacity][]);
+        writeCurListIndex++;
+        writeCurArrayIndex = capacity - 1;
+      } else {
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        TsPrimitiveType[][] newValueData = new TsPrimitiveType[newCapacity][];
+
+        System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+        System.arraycopy(vectorRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+        timeRet.set(0, newTimeData);
+        vectorRet.set(0, newValueData);
+
+        writeCurArrayIndex = newCapacity - capacity - 1;
+        capacity = newCapacity;
+      }
+    }
+    timeRet.get(0)[writeCurArrayIndex] = t;
+    vectorRet.get(0)[writeCurArrayIndex] = v;
+
+    writeCurArrayIndex--;
+    count++;
+  }
+
   @Override
   public boolean hasCurrent() {
     return (readCurListIndex == 0 && readCurArrayIndex > writeCurArrayIndex)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
new file mode 100644
index 0000000..756aaff
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+public interface IBatchDataIterator {
+
+  boolean hasNext();
+
+  void next();
+
+  long currentTime();
+
+  Object currentValue();
+
+  void reset();
+
+  int totalLength();
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java
deleted file mode 100644
index ae02551..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.reader;
-
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-
-public class BatchDataIterator implements IPointReader {
-
-  private BatchData batchData;
-
-  public BatchDataIterator(BatchData batchData) {
-    this.batchData = batchData;
-  }
-
-  @Override
-  public boolean hasNextTimeValuePair() {
-    return batchData.hasCurrent();
-  }
-
-  @Override
-  public TimeValuePair nextTimeValuePair() {
-    TimeValuePair timeValuePair =
-        new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
-    batchData.next();
-    return timeValuePair;
-  }
-
-  @Override
-  public TimeValuePair currentTimeValuePair() {
-    return new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType());
-  }
-
-  @Override
-  public void close() {
-    batchData = null;
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
index 09a2ab7..4c5290c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
@@ -101,7 +101,7 @@ public class VectorPageReader implements IPageReader {
         pageData.putVector(timeBatch[i], v);
       }
     }
-    return pageData;
+    return pageData.flip();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
@@ -117,6 +117,10 @@ public class VectorPageReader implements IPageReader {
         : timePageReader.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valuePageReaderList.get(index).getStatistics();
+  }
+
   @Override
   public void setFilter(Filter filter) {
     this.filter = filter;