You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/12 01:14:47 UTC

[incubator-iotdb] branch master updated: [IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f951bd3  [IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)
f951bd3 is described below

commit f951bd378c75b3b432612031fc8365891a0b0d9c
Author: Dawei Liu <13...@qq.com>
AuthorDate: Thu Dec 12 09:14:37 2019 +0800

    [IOTDB-306] new reader with ChunkMetaData for aggregateFunction (#614)
    
    * new hasNextChunk and hasNextPageInCurrentChunk for IAggregateReader
---
 .../db/query/aggregation/AggregateFunction.java    |   4 +
 .../db/query/aggregation/impl/AvgAggrFunc.java     |   7 ++
 .../db/query/aggregation/impl/CountAggrFunc.java   |   8 ++
 .../query/aggregation/impl/FirstValueAggrFunc.java |  15 +++
 .../query/aggregation/impl/LastValueAggrFunc.java  |   7 ++
 .../db/query/aggregation/impl/MaxTimeAggrFunc.java |   7 ++
 .../query/aggregation/impl/MaxValueAggrFunc.java   |   7 ++
 .../db/query/aggregation/impl/MinTimeAggrFunc.java |  10 ++
 .../query/aggregation/impl/MinValueAggrFunc.java   |   7 ++
 .../db/query/executor/AggregateEngineExecutor.java | 126 ++++++++++++---------
 .../iotdb/db/query/reader/IAggregateReader.java    |   6 +
 .../query/reader/chunkRelated/MemChunkReader.java  |  17 +++
 .../fileRelated/FileSeriesReaderAdapter.java       |  16 +++
 .../db/query/reader/universal/IterateReader.java   |  30 +++++
 .../read/reader/series/FileSeriesReader.java       |  40 ++++++-
 15 files changed, 253 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index 3fae3bd..0d0fb32 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -119,4 +120,7 @@ public abstract class AggregateFunction {
   public TSDataType getResultDataType() {
     return resultDataType;
   }
+
+  public abstract void calculateValueFromChunkMetaData(
+      ChunkMetaData chunkMetaData) throws QueryProcessException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
index 7888021..2f6a676 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -163,6 +164,12 @@ public class AvgAggrFunc extends AggregateFunction {
     return false;
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    sum += chunkMetaData.getStatistics().getSumValue();
+    cnt += chunkMetaData.getNumOfPoints();
+  }
+
   /**
    * Return type name of aggregation
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index cab4e90..a89d063 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.slf4j.Logger;
@@ -152,4 +153,11 @@ public class CountAggrFunc extends AggregateFunction {
   public boolean isCalculatedAggregationResult() {
     return false;
   }
+
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    long preValue = resultData.getLongRet();
+    preValue += chunkMetaData.getNumOfPoints();
+    resultData.setLongRet(preValue);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
index 6a64c6c..acf9d95 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -151,4 +152,18 @@ public class FirstValueAggrFunc extends AggregateFunction {
   public boolean isCalculatedAggregationResult() {
     return resultData.isSetTime();
   }
+
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData)
+      throws QueryProcessException {
+    if (resultData.isSetTime()) {
+      return;
+    }
+
+    Object firstVal = chunkMetaData.getStatistics().getFirstValue();
+    if (firstVal == null) {
+      throw new QueryProcessException("chunkMetaData contains no FIRST value");
+    }
+    resultData.putTimeAndValue(0, firstVal);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
index 643a2c9..94a58f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -138,6 +139,12 @@ public class LastValueAggrFunc extends AggregateFunction {
     return false;
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    Object lastVal = chunkMetaData.getStatistics().getLastValue();
+    updateLastResult(chunkMetaData.getEndTime(), lastVal);
+  }
+
   private void updateLastResult(long time, Object value) {
     if (!resultData.isSetTime()) {
       resultData.putTimeAndValue(time, value);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index 5a566a7..d4251f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -126,6 +127,12 @@ public class MaxTimeAggrFunc extends AggregateFunction {
     return false;
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    long maxTimestamp = chunkMetaData.getEndTime();
+    updateMaxTimeResult(0, maxTimestamp);
+  }
+
   private void updateMaxTimeResult(long time, long value) {
     if (!resultData.isSetValue() || value >= resultData.getLongRet()) {
       resultData.setTimestamp(time);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index c4d39a5..a459793 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -159,6 +160,12 @@ public class MaxValueAggrFunc extends AggregateFunction {
     return false;
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    Comparable<Object> maxVal = (Comparable<Object>) chunkMetaData.getStatistics().getMaxValue();
+    updateResult(maxVal);
+  }
+
   private void updateResult(Comparable<Object> maxVal) {
     if (maxVal == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index b237bcb..7e1487a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -153,4 +154,13 @@ public class MinTimeAggrFunc extends AggregateFunction {
     return resultData.isSetValue();
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    if (resultData.isSetValue()) {
+      return;
+    }
+    long time = chunkMetaData.getStartTime();
+    resultData.putTimeAndValue(0, time);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index 323f3c4..b91fd2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -154,6 +155,12 @@ public class MinValueAggrFunc extends AggregateFunction {
     return false;
   }
 
+  @Override
+  public void calculateValueFromChunkMetaData(ChunkMetaData chunkMetaData) {
+    Comparable<Object> minVal = (Comparable<Object>) chunkMetaData.getStatistics().getMinValue();
+    updateResult(minVal);
+  }
+
   private void updateResult(Comparable<Object> minVal) {
     if (minVal == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 84fd17d..73536e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -30,8 +30,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrFunc;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
@@ -45,6 +47,7 @@ import org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader
 import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -134,10 +137,10 @@ public class AggregateEngineExecutor {
   /**
    * calculation aggregate result with only time filter or no filter for one series.
    *
-   * @param function aggregate function
-   * @param sequenceReader sequence data reader
+   * @param function         aggregate function
+   * @param sequenceReader   sequence data reader
    * @param unSequenceReader unsequence data reader
-   * @param filter time filter or null
+   * @param filter           time filter or null
    * @return one series aggregate result data
    */
   private AggreResultData aggregateWithoutValueFilter(AggregateFunction function,
@@ -148,23 +151,38 @@ public class AggregateEngineExecutor {
           filter);
     }
 
-    while (sequenceReader.hasNext()) {
-      PageHeader pageHeader = sequenceReader.nextPageHeader();
-      // judge if overlap with unsequence data
-      if (canUseHeader(function, pageHeader, unSequenceReader, filter)) {
-        // cal by pageHeader
-        function.calculateValueFromPageHeader(pageHeader);
-        sequenceReader.skipPageData();
-      } else {
-        // cal by pageData
-        function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
+    while (sequenceReader.hasNextChunk()) {
+      ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
+      if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
+          chunkMetaData.getEndTime(), unSequenceReader, filter)) {
+        function.calculateValueFromChunkMetaData(chunkMetaData);
+        if (isEarlyBreakFunc(function)) {
+          break;
+        }
+        continue;
       }
+      while (sequenceReader.hasNextPageInCurrentChunk()) {
+        PageHeader pageHeader = sequenceReader.nextPageHeader();
+        // judge if overlap with unsequence data
+        if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
+            pageHeader.getEndTime(),
+            unSequenceReader, filter)) {
+          // cal by pageHeader
+          function.calculateValueFromPageHeader(pageHeader);
+          if (isEarlyBreakFunc(function)) {
+            break;
+          }
+          sequenceReader.skipPageData();
+        } else {
+          // cal by pageData
+          function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader);
+        }
 
-      if (function.isCalculatedAggregationResult()) {
-        return function.getResult();
+        if (function.isCalculatedAggregationResult()) {
+          return function.getResult();
+        }
       }
     }
-
     // cal with unsequence data
     if (unSequenceReader.hasNext()) {
       function.calculateValueFromUnsequenceReader(unSequenceReader);
@@ -172,20 +190,19 @@ public class AggregateEngineExecutor {
     return function.getResult();
   }
 
+  private boolean isEarlyBreakFunc(AggregateFunction function) {
+    if (function instanceof FirstValueAggrFunc || function instanceof MinTimeAggrFunc) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * determine whether pageHeader can be used to compute aggregation results.
    */
-  private boolean canUseHeader(AggregateFunction function, PageHeader pageHeader,
+  private boolean canUseHeader(AggregateFunction function, long minTime, long maxTime,
       IPointReader unSequenceReader, Filter filter)
       throws IOException, QueryProcessException {
-    // if page data is memory data.
-    if (pageHeader == null) {
-      return false;
-    }
-
-    long minTime = pageHeader.getStartTime();
-    long maxTime = pageHeader.getEndTime();
-
     // If there are points in the page that do not satisfy the time filter,
     // page header cannot be used to calculate.
     if (filter != null && !filter.containStartEndTime(minTime, maxTime)) {
@@ -202,8 +219,8 @@ public class AggregateEngineExecutor {
   /**
    * handle last and max_time aggregate function with only time filter or no filter.
    *
-   * @param function aggregate function
-   * @param sequenceReader sequence data reader
+   * @param function         aggregate function
+   * @param sequenceReader   sequence data reader
    * @param unSequenceReader unsequence data reader
    * @return BatchData-aggregate result
    */
@@ -211,41 +228,46 @@ public class AggregateEngineExecutor {
       IAggregateReader sequenceReader, IPointReader unSequenceReader, Filter timeFilter)
       throws IOException, QueryProcessException {
     long lastBatchTimeStamp = Long.MIN_VALUE;
-    boolean isChunkEnd = false;
-    while (sequenceReader.hasNext()) {
-      PageHeader pageHeader = sequenceReader.nextPageHeader();
-      // judge if overlap with unsequence data
-      if (canUseHeader(function, pageHeader, unSequenceReader, timeFilter)) {
-        // cal by pageHeader
-        function.calculateValueFromPageHeader(pageHeader);
-        sequenceReader.skipPageData();
-
-        if (lastBatchTimeStamp > pageHeader.getStartTime()) {
-          // the chunk is end.
-          isChunkEnd = true;
-        } else {
-          // current page and last page are in the same chunk.
+
+    while (sequenceReader.hasNextChunk()) {
+      ChunkMetaData chunkMetaData = sequenceReader.nextChunkMeta();
+      // if can use chunkMetaData ,skip this chunk
+      if (chunkMetaData != null && canUseHeader(function, chunkMetaData.getStartTime(),
+          chunkMetaData.getEndTime(), unSequenceReader, timeFilter)) {
+        function.calculateValueFromChunkMetaData(chunkMetaData);
+        //if this chunk is last chunk, broken out of the loop
+        if (lastBatchTimeStamp > chunkMetaData.getStartTime()) {
+          break;
+        }
+        lastBatchTimeStamp = chunkMetaData.getStartTime();
+        continue;
+      }
+      //if can't use chunkMetaData, try to use pageHeader
+      while (sequenceReader.hasNextPageInCurrentChunk()) {
+        PageHeader pageHeader = sequenceReader.nextPageHeader();
+        if (pageHeader != null && canUseHeader(function, pageHeader.getStartTime(),
+            pageHeader.getEndTime(),
+            unSequenceReader, timeFilter)) {
+          // cal by pageHeader
+          function.calculateValueFromPageHeader(pageHeader);
+          sequenceReader.skipPageData();
+          //if this page is last chunk, broken out of the loop
+          if (lastBatchTimeStamp > pageHeader.getStartTime()) {
+            break;
+          }
           lastBatchTimeStamp = pageHeader.getStartTime();
+          continue;
         }
-      } else {
-        // cal by pageData
         BatchData batchData = sequenceReader.nextBatch();
         if (batchData.length() > 0) {
           if (lastBatchTimeStamp > batchData.currentTime()) {
-            // the chunk is end.
-            isChunkEnd = true;
-          } else {
-            // current page and last page are in the same chunk.
-            lastBatchTimeStamp = batchData.currentTime();
+            break;
           }
+          lastBatchTimeStamp = batchData.currentTime();
           function.calculateValueFromPageData(batchData, unSequenceReader);
         }
       }
-      if (isChunkEnd) {
-        break;
-      }
     }
-
     // cal with unsequence data
     if (unSequenceReader.hasNext()) {
       function.calculateValueFromUnsequenceReader(unSequenceReader);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
index 7cb9112..419ca32 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java
@@ -20,9 +20,15 @@ package org.apache.iotdb.db.query.reader;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 
 public interface IAggregateReader extends IBatchReader {
 
+  boolean hasNextChunk() throws IOException;
+
+  ChunkMetaData nextChunkMeta();
+
+  boolean hasNextPageInCurrentChunk() throws IOException;
   /**
    * Returns meta-information of batch data.
    * <p>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
index b0885ae..1193ff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.chunkRelated;
 
+import java.io.IOException;
 import java.util.Iterator;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
@@ -25,6 +26,7 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.fileRelated.UnSealedTsFileIterateReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -112,6 +114,21 @@ public class MemChunkReader implements IPointReader, IAggregateReader {
   }
 
   @Override
+  public boolean hasNextChunk() throws IOException {
+    return hasNext();
+  }
+
+  @Override
+  public ChunkMetaData nextChunkMeta() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNextPageInCurrentChunk() throws IOException {
+    return hasNext();
+  }
+
+  @Override
   public PageHeader nextPageHeader() {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
index ff048fd..e79e10a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/FileSeriesReaderAdapter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.fileRelated;
 import java.io.IOException;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
 
@@ -43,6 +44,21 @@ public class FileSeriesReaderAdapter implements IAggregateReader {
   }
 
   @Override
+  public boolean hasNextChunk() throws IOException {
+    return fileSeriesReader.hasNextChunk();
+  }
+
+  @Override
+  public ChunkMetaData nextChunkMeta() {
+    return fileSeriesReader.currentChunkMeta();
+  }
+
+  @Override
+  public boolean hasNextPageInCurrentChunk() throws IOException {
+    return fileSeriesReader.hasNextPageInCurrentChunk();
+  }
+
+  @Override
   public PageHeader nextPageHeader() throws IOException {
     return fileSeriesReader.nextPageHeader();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
index 58ce739..36d9f66 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/IterateReader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.universal;
 import java.io.IOException;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
@@ -84,6 +85,35 @@ public abstract class IterateReader implements IAggregateReader {
   }
 
   @Override
+  public boolean hasNextChunk() throws IOException {
+
+    if (curReaderInitialized && currentSeriesReader.hasNextChunk()) {
+      return true;
+    } else {
+      curReaderInitialized = false;
+    }
+
+    while (nextSeriesReaderIndex < readerSize) {
+      boolean isConstructed = constructNextReader(nextSeriesReaderIndex++);
+      if (isConstructed && currentSeriesReader.hasNextChunk()) {
+        curReaderInitialized = true;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasNextPageInCurrentChunk() throws IOException {
+    return currentSeriesReader.hasNextPageInCurrentChunk();
+  }
+
+  @Override
+  public ChunkMetaData nextChunkMeta() {
+    return currentSeriesReader.nextChunkMeta();
+  }
+
+  @Override
   public void close() {
     // file stream is managed in QueryResourceManager.
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index ca43763..5f71935 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -38,6 +38,7 @@ public abstract class FileSeriesReader {
   private int chunkToRead;
 
   private BatchData data;
+  private ChunkMetaData chunkMetaData;
 
   /**
    * constructor of FileSeriesReader.
@@ -49,9 +50,9 @@ public abstract class FileSeriesReader {
   }
 
   /**
-   * check if current chunk has next batch data.
+   * check if all chunks has next batch data.
    *
-   * @return True if current chunk has next batch data
+   * @return True if all chunks has next batch data
    */
   public boolean hasNextBatch() throws IOException {
 
@@ -107,4 +108,39 @@ public abstract class FileSeriesReader {
   private ChunkMetaData nextChunkMeta() {
     return chunkMetaDataList.get(chunkToRead++);
   }
+
+  /**
+   * check current file has next chunk.
+   *
+   * @return True if current file has next chunk data
+   */
+  public boolean hasNextPageInCurrentChunk() throws IOException {
+    if (chunkReader != null && chunkReader.hasNextBatch()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * check current chunk has next batch data.
+   *
+   * @return
+   */
+  public boolean hasNextChunk() throws IOException {
+    // current file still have chunks, init new chunk reader
+    while (chunkToRead < chunkMetaDataList.size()) {
+
+      chunkMetaData = nextChunkMeta();
+      if (chunkSatisfied(chunkMetaData)) {
+        // chunk metadata satisfy the condition
+        initChunkReader(chunkMetaData);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public ChunkMetaData currentChunkMeta() {
+    return chunkMetaData;
+  }
 }