You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/09/16 09:07:43 UTC

[iotdb] 03/05: implement update by statistics

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggrVector2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c38658d785a1fd7477c05bd711b2bb15067d0538
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Sep 16 15:08:20 2021 +0800

    implement update by statistics
---
 .../db/query/executor/AggregationExecutor.java     | 238 +++++++++++++++++++--
 .../db/query/reader/series/IAggregateReader.java   |   2 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  38 ++++
 .../reader/series/VectorSeriesAggregateReader.java | 169 +++++++++++++++
 .../tsfile/file/metadata/VectorChunkMetadata.java  |   4 +
 .../file/metadata/VectorTimeSeriesMetadata.java    |   4 +
 .../tsfile/read/reader/page/VectorPageReader.java  |   4 +
 7 files changed, 443 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 871915a..a750a62 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
@@ -107,7 +108,7 @@ public class AggregationExecutor {
         StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
 
     // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
-    Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
+    Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
         groupVectorSeries(pathToAggrIndexesMap);
     try {
       for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
@@ -118,8 +119,7 @@ public class AggregationExecutor {
             aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()),
             timeFilter);
       }
-      for (Map.Entry<PartialPath, Map<String, List<Integer>>> entry :
-          vectorPathIndexesMap.entrySet()) {
+      for (Map.Entry<PartialPath, List<List<Integer>>> entry : vectorPathIndexesMap.entrySet()) {
         VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey();
         aggregateOneVectorSeries(
             vectorSeries,
@@ -150,7 +150,6 @@ public class AggregationExecutor {
     boolean[] isAsc = new boolean[aggregateResultList.length];
 
     TSDataType tsDataType = dataTypes.get(indexes.get(0));
-
     for (int i : indexes) {
       // construct AggregateResult
       AggregateResult aggregateResult =
@@ -183,11 +182,56 @@ public class AggregationExecutor {
   }
 
   protected void aggregateOneVectorSeries(
-      PartialPath seriesPath,
-      Map<String, List<Integer>> subIndexes,
+      VectorPartialPath seriesPath,
+      List<List<Integer>> subIndexes,
       Set<String> allMeasurementsInDevice,
       Filter timeFilter)
-      throws IOException, QueryProcessException, StorageEngineException {}
+      throws IOException, QueryProcessException, StorageEngineException {
+    List<List<AggregateResult>> ascAggregateResultList = new ArrayList<>();
+    List<List<AggregateResult>> descAggregateResultList = new ArrayList<>();
+    boolean[] isAsc = new boolean[aggregateResultList.length];
+
+    for (List<Integer> subIndex : subIndexes) {
+      TSDataType tsDataType = dataTypes.get(subIndex.get(0));
+      List<AggregateResult> subAscResultList = new ArrayList<>();
+      List<AggregateResult> subDescResultList = new ArrayList<>();
+      for (int i : subIndex) {
+        // construct AggregateResult
+        AggregateResult aggregateResult =
+            AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
+        if (aggregateResult.isAscending()) {
+          subAscResultList.add(aggregateResult);
+          isAsc[i] = true;
+        } else {
+          subDescResultList.add(aggregateResult);
+        }
+      }
+      ascAggregateResultList.add(subAscResultList);
+      descAggregateResultList.add(subDescResultList);
+    }
+
+    aggregateOneVectorSeries(
+        seriesPath,
+        allMeasurementsInDevice,
+        context,
+        timeFilter,
+        TSDataType.VECTOR,
+        ascAggregateResultList,
+        descAggregateResultList,
+        null);
+
+    for (int i = 0; i < subIndexes.size(); i++) {
+      List<Integer> subIndex = subIndexes.get(i);
+      List<AggregateResult> subAscResultList = ascAggregateResultList.get(i);
+      List<AggregateResult> subDescResultList = descAggregateResultList.get(i);
+      int ascIndex = 0;
+      int descIndex = 0;
+      for (int index : subIndex) {
+        aggregateResultList[index] =
+            isAsc[index] ? subAscResultList.get(ascIndex++) : subDescResultList.get(descIndex++);
+      }
+    }
+  }
 
   @SuppressWarnings("squid:S107")
   public static void aggregateOneSeries(
@@ -240,6 +284,56 @@ public class AggregationExecutor {
     }
   }
 
+  public static void aggregateOneVectorSeries(
+      VectorPartialPath seriesPath,
+      Set<String> measurements,
+      QueryContext context,
+      Filter timeFilter,
+      TSDataType tsDataType,
+      List<List<AggregateResult>> ascAggregateResultList,
+      List<List<AggregateResult>> descAggregateResultList,
+      TsFileFilter fileFilter)
+      throws StorageEngineException, IOException, QueryProcessException {
+
+    // construct series reader without value filter
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
+    if (fileFilter != null) {
+      QueryUtils.filterQueryDataSource(queryDataSource, fileFilter);
+    }
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    if (ascAggregateResultList != null && !ascAggregateResultList.isEmpty()) {
+      VectorSeriesAggregateReader seriesReader =
+          new VectorSeriesAggregateReader(
+              seriesPath,
+              measurements,
+              tsDataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              true);
+      aggregateFromVectorReader(seriesReader, ascAggregateResultList);
+    }
+    if (descAggregateResultList != null && !descAggregateResultList.isEmpty()) {
+      VectorSeriesAggregateReader seriesReader =
+          new VectorSeriesAggregateReader(
+              seriesPath,
+              measurements,
+              tsDataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              false);
+      aggregateFromVectorReader(seriesReader, descAggregateResultList);
+    }
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private static void aggregateFromReader(
       IAggregateReader seriesReader, List<AggregateResult> aggregateResultList)
@@ -285,6 +379,72 @@ public class AggregationExecutor {
     }
   }
 
+  private static void aggregateFromVectorReader(
+      VectorSeriesAggregateReader seriesReader, List<List<AggregateResult>> aggregateResultList)
+      throws QueryProcessException, IOException {
+    int remainingToCalculate = 0;
+    List<boolean[]> isCalculatedArray = new ArrayList<>();
+    for (List<AggregateResult> subAggregateResults : aggregateResultList) {
+      remainingToCalculate += subAggregateResults.size();
+      boolean[] subCalculatedArray = new boolean[subAggregateResults.size()];
+      isCalculatedArray.add(subCalculatedArray);
+    }
+
+    while (seriesReader.hasNextFile()) {
+      // cal by file statistics
+      if (seriesReader.canUseCurrentFileStatistics()) {
+        while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
+          Statistics fileStatistics = seriesReader.currentFileStatistics();
+          remainingToCalculate =
+              aggregateStatistics(
+                  aggregateResultList.get(seriesReader.getCurIndex()),
+                  isCalculatedArray.get(seriesReader.getCurIndex()),
+                  remainingToCalculate,
+                  fileStatistics);
+          if (remainingToCalculate == 0) {
+            seriesReader.resetIndex();
+            return;
+          }
+          seriesReader.nextIndex();
+        }
+        seriesReader.skipCurrentFile();
+        continue;
+      }
+
+      while (seriesReader.hasNextChunk()) {
+        // cal by chunk statistics
+        if (seriesReader.canUseCurrentChunkStatistics()) {
+          while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
+            Statistics chunkStatistics = seriesReader.currentChunkStatistics();
+            remainingToCalculate =
+                aggregateStatistics(
+                    aggregateResultList.get(seriesReader.getCurIndex()),
+                    isCalculatedArray.get(seriesReader.getCurIndex()),
+                    remainingToCalculate,
+                    chunkStatistics);
+            if (remainingToCalculate == 0) {
+              seriesReader.resetIndex();
+              return;
+            }
+            seriesReader.nextIndex();
+          }
+          seriesReader.skipCurrentChunk();
+          continue;
+        }
+
+        remainingToCalculate =
+            aggregateVectorPages(
+                seriesReader,
+                aggregateResultList.get(seriesReader.getCurIndex()),
+                isCalculatedArray.get(seriesReader.getCurIndex()),
+                remainingToCalculate);
+        if (remainingToCalculate == 0) {
+          return;
+        }
+      }
+    }
+  }
+
   /** Aggregate each result in the list with the statistics */
   private static int aggregateStatistics(
       List<AggregateResult> aggregateResultList,
@@ -348,6 +508,48 @@ public class AggregationExecutor {
     return remainingToCalculate;
   }
 
+  private static int aggregateVectorPages(
+      VectorSeriesAggregateReader seriesReader,
+      List<AggregateResult> aggregateResultList,
+      boolean[] isCalculatedArray,
+      int remainingToCalculate)
+      throws IOException, QueryProcessException {
+    while (seriesReader.hasNextPage()) {
+      // cal by page statistics
+      if (seriesReader.canUseCurrentPageStatistics()) {
+        while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
+          Statistics pageStatistic = seriesReader.currentPageStatistics();
+          remainingToCalculate =
+              aggregateStatistics(
+                  aggregateResultList, isCalculatedArray, remainingToCalculate, pageStatistic);
+          if (remainingToCalculate == 0) {
+            seriesReader.resetIndex();
+            return 0;
+          }
+          seriesReader.nextIndex();
+        }
+        seriesReader.skipCurrentPage();
+        continue;
+      }
+      BatchData nextOverlappedPageData = seriesReader.nextPage();
+      for (int i = 0; i < aggregateResultList.size(); i++) {
+        if (!isCalculatedArray[i]) {
+          AggregateResult aggregateResult = aggregateResultList.get(i);
+          aggregateResult.updateResultFromPageData(nextOverlappedPageData);
+          nextOverlappedPageData.resetBatchData();
+          if (aggregateResult.hasFinalResult()) {
+            isCalculatedArray[i] = true;
+            remainingToCalculate--;
+            if (remainingToCalculate == 0) {
+              return 0;
+            }
+          }
+        }
+      }
+    }
+    return remainingToCalculate;
+  }
+
   /** execute aggregate function with value filter. */
   public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
@@ -521,19 +723,25 @@ public class AggregationExecutor {
 
   /**
    * Group all the subSensors of one vector into one VectorPartialPath and Remove vectorPartialPath
-   * from pathToAggrIndexesMap.
-   *
-   * @return e.g. vector[s1, s2], Map{s1 -> 1, s2 -> 2}
+   * from pathToAggrIndexesMap. For example, input map: vector1[s1] -> [1, 3], vector1[s2] -> [2,4],
+   * will return vector1[s1,s2], [[1,3], [2,4]]
    */
-  private Map<PartialPath, Map<String, List<Integer>>> groupVectorSeries(
+  private Map<PartialPath, List<List<Integer>>> groupVectorSeries(
       Map<PartialPath, List<Integer>> pathToAggrIndexesMap) {
-    Map<PartialPath, Map<String, List<Integer>>> result = new HashMap<>();
+    Map<PartialPath, List<List<Integer>>> result = new HashMap<>();
+    Map<String, VectorPartialPath> temp = new HashMap<>();
+
     for (PartialPath seriesPath : pathToAggrIndexesMap.keySet()) {
       if (seriesPath instanceof VectorPartialPath) {
         List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
-        result
-            .computeIfAbsent(seriesPath, key -> new HashMap<>())
-            .put(((VectorPartialPath) seriesPath).getSubSensor(0), indexes);
+        VectorPartialPath groupPath = temp.get(seriesPath.getFullPath());
+        if (groupPath == null) {
+          groupPath = (VectorPartialPath) seriesPath.copy();
+          temp.put(seriesPath.getFullPath(), groupPath);
+        } else {
+          groupPath.addSubSensor(((VectorPartialPath) seriesPath).getSubSensorsList());
+        }
+        result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
       }
     }
     return result;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index 867ad9d..c48b88e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -37,7 +37,7 @@ public interface IAggregateReader {
 
   boolean canUseCurrentChunkStatistics() throws IOException;
 
-  Statistics currentChunkStatistics();
+  Statistics currentChunkStatistics() throws IOException;
 
   void skipCurrentChunk();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index ebd3db0..b6f2d3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -43,6 +45,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -273,6 +276,13 @@ public class SeriesReader {
     return firstTimeSeriesMetadata.getStatistics();
   }
 
+  Statistics currentFileStatistics(int index) throws IOException {
+    if (!(firstTimeSeriesMetadata instanceof VectorTimeSeriesMetadata)) {
+      throw new IOException("Can only get statistics by index from vectorTimeSeriesMetaData");
+    }
+    return ((VectorTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
+  }
+
   boolean currentFileModified() throws IOException {
     if (firstTimeSeriesMetadata == null) {
       throw new IOException("no first file");
@@ -394,6 +404,13 @@ public class SeriesReader {
     return firstChunkMetadata.getStatistics();
   }
 
+  Statistics currentChunkStatistics(int index) throws IOException {
+    if (!(firstChunkMetadata instanceof VectorChunkMetadata)) {
+      throw new IOException("Can only get statistics by index from vectorChunkMetaData");
+    }
+    return ((VectorChunkMetadata) firstChunkMetadata).getStatistics(index);
+  }
+
   boolean currentChunkModified() throws IOException {
     if (firstChunkMetadata == null) {
       throw new IOException("no first chunk");
@@ -615,6 +632,16 @@ public class SeriesReader {
     return firstPageReader.getStatistics();
   }
 
+  Statistics currentPageStatistics(int index) throws IOException {
+    if (firstPageReader == null) {
+      return null;
+    }
+    if (!(firstPageReader.isVectorPageReader())) {
+      throw new IOException("Can only get statistics by index from VectorPageReader");
+    }
+    return firstPageReader.getStatistics(index);
+  }
+
   boolean currentPageModified() throws IOException {
     if (firstPageReader == null) {
       throw new IOException("no first page");
@@ -1023,10 +1050,21 @@ public class SeriesReader {
       this.isSeq = isSeq;
     }
 
+    public boolean isVectorPageReader() {
+      return data instanceof VectorPageReader;
+    }
+
     Statistics getStatistics() {
       return data.getStatistics();
     }
 
+    Statistics getStatistics(int index) throws IOException {
+      if (!(data instanceof VectorPageReader)) {
+        throw new IOException("Can only get statistics by index from VectorPageReader");
+      }
+      return ((VectorPageReader) data).getStatistics(index);
+    }
+
     BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
       return data.getAllSatisfiedPageData(ascending);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
new file mode 100644
index 0000000..5f35d45
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class VectorSeriesAggregateReader implements IAggregateReader {
+
+  private final SeriesReader seriesReader;
+  private int curIndex = 0;
+  private int subSensorSize;
+
+  public VectorSeriesAggregateReader(
+      VectorPartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    this.seriesReader =
+        new SeriesReader(
+            seriesPath,
+            allSensors,
+            dataType,
+            context,
+            dataSource,
+            timeFilter,
+            valueFilter,
+            fileFilter,
+            ascending);
+    this.subSensorSize = seriesPath.getSubSensorsList().size();
+  }
+
+  @Override
+  public boolean isAscending() {
+    return seriesReader.getOrderUtils().getAscending();
+  }
+
+  @Override
+  public boolean hasNextFile() throws IOException {
+    return seriesReader.hasNextFile();
+  }
+
+  @Override
+  public boolean canUseCurrentFileStatistics() throws IOException {
+    Statistics fileStatistics = currentFileStatistics();
+    return !seriesReader.isFileOverlapped()
+        && containedByTimeFilter(fileStatistics)
+        && !seriesReader.currentFileModified();
+  }
+
+  @Override
+  public Statistics currentFileStatistics() throws IOException {
+    return seriesReader.currentFileStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentFile() {
+    seriesReader.skipCurrentFile();
+  }
+
+  @Override
+  public boolean hasNextChunk() throws IOException {
+    return seriesReader.hasNextChunk();
+  }
+
+  @Override
+  public boolean canUseCurrentChunkStatistics() throws IOException {
+    Statistics chunkStatistics = currentChunkStatistics();
+    return !seriesReader.isChunkOverlapped()
+        && containedByTimeFilter(chunkStatistics)
+        && !seriesReader.currentChunkModified();
+  }
+
+  @Override
+  public Statistics currentChunkStatistics() throws IOException {
+    return seriesReader.currentChunkStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentChunk() {
+    seriesReader.skipCurrentChunk();
+  }
+
+  @Override
+  public boolean hasNextPage() throws IOException {
+    return seriesReader.hasNextPage();
+  }
+
+  @Override
+  public boolean canUseCurrentPageStatistics() throws IOException {
+    Statistics currentPageStatistics = currentPageStatistics();
+    if (currentPageStatistics == null) {
+      return false;
+    }
+    return !seriesReader.isPageOverlapped()
+        && containedByTimeFilter(currentPageStatistics)
+        && !seriesReader.currentPageModified();
+  }
+
+  @Override
+  public Statistics currentPageStatistics() throws IOException {
+    return seriesReader.currentPageStatistics(curIndex);
+  }
+
+  @Override
+  public void skipCurrentPage() {
+    seriesReader.skipCurrentPage();
+  }
+
+  @Override
+  public BatchData nextPage() throws IOException {
+    return seriesReader.nextPage().flip();
+  }
+
+  private boolean containedByTimeFilter(Statistics statistics) {
+    Filter timeFilter = seriesReader.getTimeFilter();
+    return timeFilter == null
+        || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
+  }
+
+  public int getCurIndex() {
+    return curIndex;
+  }
+
+  public int getSubSensorSize() {
+    return subSensorSize;
+  }
+
+  public void nextIndex() {
+    curIndex++;
+    if (curIndex > subSensorSize) {
+      resetIndex();
+    }
+  }
+
+  public void resetIndex() {
+    curIndex = 0;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 6558da0..2bfda53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -49,6 +49,10 @@ public class VectorChunkMetadata implements IChunkMetadata {
         : timeChunkMetadata.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valueChunkMetadataList.get(index).getStatistics();
+  }
+
   @Override
   public boolean isModified() {
     return timeChunkMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index 87e88af..c3d727f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -49,6 +49,10 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
         : timeseriesMetadata.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valueTimeseriesMetadataList.get(index).getStatistics();
+  }
+
   @Override
   public boolean isModified() {
     return timeseriesMetadata.isModified();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
index 09a2ab7..5d2b368 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java
@@ -117,6 +117,10 @@ public class VectorPageReader implements IPageReader {
         : timePageReader.getStatistics();
   }
 
+  public Statistics getStatistics(int index) {
+    return valuePageReaderList.get(index).getStatistics();
+  }
+
   @Override
   public void setFilter(Filter filter) {
     this.filter = filter;