You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/29 10:03:03 UTC

[incubator-iotdb] branch optimize_series_reader created (now 9aa6176)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 9aa6176  reorganize series reader and aggregate reader

This branch includes the following new commits:

     new 9aa6176  reorganize series reader and aggregate reader

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: reorganize series reader and aggregate reader

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9aa61760a9303f42b5bb3f285ccaca92964e5f04
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Feb 29 18:02:39 2020 +0800

    reorganize series reader and aggregate reader
---
 .../SystemDesign/5-DataQuery/2-SeriesReader.md     |  14 +-
 .../reader/series/SeriesRawDataBatchReader.java    |  49 ++-
 .../reader/series/SeriesRawDataPointReader.java    |  43 +--
 .../iotdb/db/query/reader/series/SeriesReader.java | 340 ++++++++++++---------
 .../file/metadata/statistics/BinaryStatistics.java |   2 +-
 .../metadata/statistics/BooleanStatistics.java     |   2 +-
 .../file/metadata/statistics/DoubleStatistics.java |   2 +-
 .../file/metadata/statistics/FloatStatistics.java  |   2 +-
 .../metadata/statistics/IntegerStatistics.java     |   2 +-
 .../file/metadata/statistics/LongStatistics.java   |   2 +-
 .../file/metadata/statistics/Statistics.java       |   5 +
 11 files changed, 256 insertions(+), 207 deletions(-)

diff --git a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
index 54b0f38..8bbc548 100644
--- a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
+++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
@@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {
 
 	当前第一个chunk meta data的引用
 
-* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
+* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
       new PriorityQueue<>(
           Comparator.comparingLong(pageReader -> 				pageReader.data.getStatistics().getStartTime()));
    
@@ -272,27 +272,27 @@ while (aggregateReader.hasNextChunk()) {
 
 这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。
 
-首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`。
+首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`。
 
 #### isPageOverlapped()
 
 这个方法判断当前的Page有没有其他与之重叠的Page存在。
 
-如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。
+如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。
 
 #### nextPage()
 
 须与`isPageOverlapped()`方法搭配使用。
 
-当`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
+当`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。
 
 #### currentPageStatistics()
 
-返回`overlappedPageReaders`里第一个Page的统计信息。
+返回`cachedPageReaders`里第一个Page的统计信息。
 
 #### skipCurrentPage()
 
-跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
+跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。
 
 #### hasNextOverlappedPage()
 
@@ -300,7 +300,7 @@ while (aggregateReader.hasNextChunk()) {
 
 如果`hasCachedNextBatch`为`true`,直接返回`true`。
 
-否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
+否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
 
 然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`。
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 8e4c69f..9475b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.series;
 
+import java.util.LinkedList;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -42,6 +43,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
   private BatchData batchData;
   private boolean hasCachedBatchData = false;
 
+
   public SeriesRawDataBatchReader(SeriesReader seriesReader) {
     this.seriesReader = seriesReader;
   }
@@ -71,29 +73,46 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
       return true;
     }
 
-    while (seriesReader.hasNextChunk()) {
-      while (seriesReader.hasNextPage()) {
+    /*
+     * consume overlapped data firstly
+     */
+    if (seriesReader.hasNextOverlappedPage()) {
+      batchData = seriesReader.nextOverlappedPage();
+      hasCachedBatchData = true;
+      return true;
+    }
+
+
+    /*
+     * consume pages secondly
+     */
+    if (seriesReader.hasNextPage()) {
+      if (!seriesReader.isPageOverlapped()) {
+        batchData = seriesReader.nextPage();
+      } else if (seriesReader.hasNextOverlappedPage()) {
+        batchData = seriesReader.nextOverlappedPage();
+      }
+      hasCachedBatchData = true;
+      return true;
+    }
+
+    /*
+     * consume next chunk finally
+     */
+    if (seriesReader.hasNextChunk()) {
+      if (seriesReader.hasNextPage()) {
         if (!seriesReader.isPageOverlapped()) {
           batchData = seriesReader.nextPage();
-          if (!batchData.hasCurrent()) {
-            continue;
-          }
-          hasCachedBatchData = true;
-          return true;
-        }
-        if (seriesReader.hasNextOverlappedPage()) {
+        } else if (seriesReader.hasNextOverlappedPage()) {
           batchData = seriesReader.nextOverlappedPage();
-          if (!batchData.hasCurrent()) {
-            continue;
-          }
-          hasCachedBatchData = true;
-          return true;
         }
+        hasCachedBatchData = true;
       }
     }
-    return false;
+    return hasCachedBatchData;
   }
 
+
   @Override
   public BatchData nextBatch() throws IOException {
     if (hasCachedBatchData || hasNextBatch()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
index 4d9941c..4be9c84 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
@@ -26,37 +26,16 @@ import java.io.IOException;
 
 public class SeriesRawDataPointReader implements IPointReader {
 
-  private final SeriesReader seriesReader;
+  private final SeriesRawDataBatchReader batchReader;
 
   private boolean hasCachedTimeValuePair;
   private BatchData batchData;
   private TimeValuePair timeValuePair;
 
   public SeriesRawDataPointReader(SeriesReader seriesReader) {
-    this.seriesReader = seriesReader;
+    this.batchReader = new SeriesRawDataBatchReader(seriesReader);
   }
 
-  private boolean hasNext() throws IOException {
-    while (seriesReader.hasNextChunk()) {
-      while (seriesReader.hasNextPage()) {
-        if (seriesReader.hasNextOverlappedPage()) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private boolean hasNextSatisfiedInCurrentBatch() {
-    if (batchData != null && batchData.hasCurrent()) {
-      timeValuePair = new TimeValuePair(batchData.currentTime(),
-          batchData.currentTsPrimitiveType());
-      hasCachedTimeValuePair = true;
-      batchData.next();
-      return true;
-    }
-    return false;
-  }
 
   @Override
   public boolean hasNextTimeValuePair() throws IOException {
@@ -64,17 +43,25 @@ public class SeriesRawDataPointReader implements IPointReader {
       return true;
     }
 
-    if (hasNextSatisfiedInCurrentBatch()) {
+    if (batchData != null) {
+      timeValuePair = new TimeValuePair(batchData.currentTime(),
+          batchData.currentTsPrimitiveType());
+      hasCachedTimeValuePair = true;
+      batchData.next();
       return true;
     }
 
-    // has not cached timeValuePair
-    while (hasNext()) {
-      batchData = seriesReader.nextOverlappedPage();
-      if (hasNextSatisfiedInCurrentBatch()) {
+    while (batchReader.hasNextBatch()) {
+      batchData = batchReader.nextBatch();
+      if (batchData.hasCurrent()) {
+        timeValuePair = new TimeValuePair(batchData.currentTime(),
+            batchData.currentTsPrimitiveType());
+        hasCachedTimeValuePair = true;
+        batchData.next();
         return true;
       }
     }
+
     return false;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8b1f795..a299837 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -58,22 +58,38 @@ public class SeriesReader {
   private final Filter timeFilter;
   private final Filter valueFilter;
 
+
+  /*
+   * file cache
+   */
   private final List<TsFileResource> seqFileResource;
   private final PriorityQueue<TsFileResource> unseqFileResource;
 
+
+  /*
+   * chunk cache
+   */
+  private ChunkMetaData firstChunkMetaData;
   private final List<ChunkMetaData> seqChunkMetadatas = new LinkedList<>();
   private final PriorityQueue<ChunkMetaData> unseqChunkMetadatas =
       new PriorityQueue<>(Comparator.comparingLong(ChunkMetaData::getStartTime));
 
-  private boolean hasCachedFirstChunkMetadata;
-  private ChunkMetaData firstChunkMetaData;
-
-  private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
+  /*
+   * page cache
+   */
+  private VersionPair<IPageReader> firstPageReader;
+  private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
       new PriorityQueue<>(
           Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));
 
+  /*
+   * point cache
+   */
   private PriorityMergeReader mergeReader = new PriorityMergeReader();
 
+  /*
+   * result cache
+   */
   private boolean hasCachedNextBatch;
   private BatchData cachedBatchData;
 
@@ -104,49 +120,24 @@ public class SeriesReader {
 
 
   public boolean hasNextChunk() throws IOException {
-    if (hasCachedFirstChunkMetadata) {
-      return true;
+
+    if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader.hasNextTimeValuePair()) {
+      throw new IOException("all cached pages should be consumed first");
     }
+
     // init first chunk metadata whose startTime is minimum
-    tryToInitFirstChunk();
+    tryToUnpackAllOverlappedFilesToChunkMetadatas();
 
-    return hasCachedFirstChunkMetadata;
+    return firstChunkMetaData != null;
   }
 
-  /**
-   * Because seq data and unseq data intersect, the minimum startTime taken from two files at a time
-   * is used as the reference time to start reading data
-   */
-  private void tryToInitFirstChunk() throws IOException {
-    tryToFillChunkMetadatas();
-    hasCachedFirstChunkMetadata = true;
-    if (!seqChunkMetadatas.isEmpty() && unseqChunkMetadatas.isEmpty()) {
-      // only has seq
-      firstChunkMetaData = seqChunkMetadatas.remove(0);
-    } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) {
-      // only has unseq
-      firstChunkMetaData = unseqChunkMetadatas.poll();
-    } else if (!seqChunkMetadatas.isEmpty()) {
-      // has seq and unseq
-      if (seqChunkMetadatas.get(0).getStartTime() <= unseqChunkMetadatas.peek().getStartTime()) {
-        firstChunkMetaData = seqChunkMetadatas.remove(0);
-      } else {
-        firstChunkMetaData = unseqChunkMetadatas.poll();
-      }
-    } else {
-      // no seq nor unseq
-      hasCachedFirstChunkMetadata = false;
-    }
-    tryToFillChunkMetadatas();
-  }
 
   public boolean isChunkOverlapped() {
     Statistics chunkStatistics = firstChunkMetaData.getStatistics();
-    return mergeReader.hasNextTimeValuePair()
-        || (!seqChunkMetadatas.isEmpty()
-        && chunkStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime())
-        || (!unseqChunkMetadatas.isEmpty()
-        && chunkStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime());
+    return !seqChunkMetadatas.isEmpty()
+        && chunkStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime()
+        || !unseqChunkMetadatas.isEmpty()
+        && chunkStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime();
   }
 
   public Statistics currentChunkStatistics() {
@@ -154,32 +145,52 @@ public class SeriesReader {
   }
 
   public void skipCurrentChunk() {
-    hasCachedFirstChunkMetadata = false;
     firstChunkMetaData = null;
   }
 
   /**
-   * This method should be called after hasNxtChunk
-   * @return
-   * @throws IOException
+   * This method should be called after hasNextChunk()
+   * make sure that all overlapped pages are consumed before
    */
   public boolean hasNextPage() throws IOException {
-    if (!overlappedPageReaders.isEmpty()) {
-      return true;
+    if (mergeReader.hasNextTimeValuePair()) {
+      throw new IOException("all overlapped pages should be consumed first");
     }
 
-    fillOverlappedPageReaders();
+    if (firstChunkMetaData != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToCachedPageReaders(firstChunkMetaData.getEndTime());
+    } else {
+      /*
+       * first chunk metadata is already unpacked
+       */
+      if (firstPageReader == null && !cachedPageReaders.isEmpty()) {
+        firstPageReader = cachedPageReaders.poll();
+      }
+    }
 
-    return !overlappedPageReaders.isEmpty();
+    return firstPageReader != null;
   }
 
-  private void fillOverlappedPageReaders() throws IOException {
-    if (!hasCachedFirstChunkMetadata) {
-      return;
+
+  private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime) throws IOException {
+    while (!seqChunkMetadatas.isEmpty() && endTime >= seqChunkMetadatas.get(0).getStartTime()) {
+      unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
+    }
+    while (!unseqChunkMetadatas.isEmpty() && endTime >= unseqChunkMetadatas.peek().getStartTime()) {
+      unpackOneChunkMetaData(unseqChunkMetadatas.poll());
+    }
+
+    if (firstChunkMetaData != null && endTime >= firstChunkMetaData.getStartTime()) {
+      unpackOneChunkMetaData(firstChunkMetaData);
+      firstChunkMetaData = null;
+    }
+
+    if (firstPageReader == null && !cachedPageReaders.isEmpty()) {
+      firstPageReader = cachedPageReaders.poll();
     }
-    unpackOneChunkMetaData(firstChunkMetaData);
-    hasCachedFirstChunkMetadata = false;
-    firstChunkMetaData = null;
   }
 
   private void unpackOneChunkMetaData(ChunkMetaData chunkMetaData) throws IOException {
@@ -187,26 +198,46 @@ public class SeriesReader {
         .getPageReaderList()
         .forEach(
             pageReader ->
-                overlappedPageReaders.add(
+                cachedPageReaders.add(
                     new VersionPair(chunkMetaData.getVersion(), pageReader)));
   }
 
+  /**
+   * This method should be called after calling hasNextPage.
+   */
+  protected boolean isPageOverlapped() throws IOException {
+    if (firstPageReader == null) {
+      throw new IOException("no next page, make sure hasNextPage() is true");
+    }
+
+    Statistics firstPageStatistics = firstPageReader.data.getStatistics();
+
+    return !cachedPageReaders.isEmpty() &&
+        firstPageStatistics.getEndTime() >= cachedPageReaders.peek().data.getStatistics()
+            .getStartTime();
+  }
 
   /**
    * This method should only be used when the method isPageOverlapped() return true.
-   * @return
-   * @throws IOException
    */
   protected BatchData nextPage() throws IOException {
-    if (overlappedPageReaders.isEmpty()) {
-      throw new IOException("overlappedPageReaders is empty, hasNextPage and isPageOverlapped methods should be called first");
+    if (isPageOverlapped()) {
+      throw new IOException("next page is overlapped, make sure isPageOverlapped is false");
     }
 
-    BatchData pageData = overlappedPageReaders.poll().data.getAllSatisfiedPageData();
-    // only need to consider valueFilter because timeFilter has been set into the page reader
+    BatchData pageData = firstPageReader.data.getAllSatisfiedPageData();
+
+    /*
+     * no value filter
+     * only need to consider valueFilter because timeFilter has been set into the page reader
+     */
     if (valueFilter == null) {
       return pageData;
     }
+
+    /*
+     * has value filter
+     */
     BatchData batchData = new BatchData(pageData.getDataType());
     while (pageData.hasCurrent()) {
       if (valueFilter.satisfy(pageData.currentTime(), pageData.currentValue())) {
@@ -217,39 +248,19 @@ public class SeriesReader {
     return batchData;
   }
 
-  /**
-   * This method should be called after calling hasNextPage.
-   * @return
-   * @throws IOException
-   */
-  protected boolean isPageOverlapped() throws IOException {
-    if (overlappedPageReaders.isEmpty()) {
-      throw new IOException("overlappedPageReaders is empty, hasNextPage method should be called first");
-    }
-
-    Statistics pageStatistics = overlappedPageReaders.peek().data.getStatistics();
-    return mergeReader.hasNextTimeValuePair()
-        || (!seqChunkMetadatas.isEmpty()
-        && pageStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime())
-        || (!unseqChunkMetadatas.isEmpty()
-        && pageStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime());
-  }
-
   public Statistics currentPageStatistics() throws IOException {
-    if (overlappedPageReaders.isEmpty() || overlappedPageReaders.peek().data == null) {
-      throw new IOException("No next page statistics.");
+    if (firstPageReader == null) {
+      throw new IOException("No next page.");
     }
-    return overlappedPageReaders.peek().data.getStatistics();
+    return firstPageReader.data.getStatistics();
   }
 
   public void skipCurrentPage() {
-    overlappedPageReaders.poll();
+    firstPageReader = null;
   }
 
   /**
    * This method should be called after hasNextChunk and hasNextPage methods.
-   * @return
-   * @throws IOException
    */
   public boolean hasNextOverlappedPage() throws IOException {
 
@@ -257,93 +268,80 @@ public class SeriesReader {
       return true;
     }
 
-    putAllDirectlyOverlappedPageReadersIntoMergeReader();
+    tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader();
 
     if (mergeReader.hasNextTimeValuePair()) {
       cachedBatchData = new BatchData(dataType);
       long currentPageEndTime = mergeReader.getCurrentLargestEndTime();
+
       while (mergeReader.hasNextTimeValuePair()) {
+
         TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
         if (timeValuePair.getTimestamp() > currentPageEndTime) {
           break;
         }
-        // unpack all overlapped chunks
-        while (true) {
-          tryToFillChunkMetadatas();
-          boolean hasOverlappedChunkMetadata = false;
-          if (!seqChunkMetadatas.isEmpty()
-              && timeValuePair.getTimestamp() >= seqChunkMetadatas.get(0).getStartTime()) {
-            unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
-            hasOverlappedChunkMetadata = true;
-          }
-          if (!unseqChunkMetadatas.isEmpty()
-              && timeValuePair.getTimestamp() >= unseqChunkMetadatas.peek().getStartTime()) {
-            unpackOneChunkMetaData(unseqChunkMetadatas.poll());
-            hasOverlappedChunkMetadata = true;
-          }
-          if (!hasOverlappedChunkMetadata) {
-            break;
-          }
-        }
 
-        // put all overlapped pages into merge reader
-        while (!overlappedPageReaders.isEmpty()
-            && timeValuePair.getTimestamp()
-            >= overlappedPageReaders.peek().data.getStatistics().getStartTime()) {
-          VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
-          mergeReader.addReader(
-              pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
-              pageReader.data.getStatistics().getEndTime());
-        }
+        unpackAllOverlappedTsFilesToChunkMetadatas(timeValuePair.getTimestamp());
+        unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp());
+        unpackAllOverlappedCachedPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+        cachedBatchData.putAnObject(
+            timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+
+        mergeReader.nextTimeValuePair();
 
-        timeValuePair = mergeReader.nextTimeValuePair();
-        if (valueFilter == null || valueFilter
-                .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
-          cachedBatchData.putAnObject(
-              timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
-        }
       }
       hasCachedNextBatch = cachedBatchData.hasCurrent();
     }
     return hasCachedNextBatch;
   }
 
-  private void putAllDirectlyOverlappedPageReadersIntoMergeReader() throws IOException {
+  private void tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader() throws IOException {
+
+    /*
+     * no cached page readers
+     */
+    if (firstPageReader == null && cachedPageReaders.isEmpty()) {
+      return;
+    }
+
+    /*
+     * init firstPageReader
+     */
+    if (firstPageReader == null) {
+      firstPageReader = cachedPageReaders.poll();
+    }
+
     long currentPageEndTime;
     if (mergeReader.hasNextTimeValuePair()) {
       currentPageEndTime = mergeReader.getCurrentLargestEndTime();
-    } else if (!overlappedPageReaders.isEmpty()) {
-      // put the first page into merge reader
-      currentPageEndTime = overlappedPageReaders.peek().data.getStatistics().getEndTime();
-      VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
-      mergeReader.addReader(
-          pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
-          pageReader.data.getStatistics().getEndTime());
     } else {
-      return;
+      // put the first page into merge reader
+      currentPageEndTime = firstPageReader.data.getStatistics().getEndTime();
     }
 
-    // unpack all overlapped seq chunk meta data into overlapped page readers
-    while (!seqChunkMetadatas.isEmpty()
-        && currentPageEndTime >= seqChunkMetadatas.get(0).getStartTime()) {
-      unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
-      tryToFillChunkMetadatas();
+    /*
+     * put all currently directly overlapped page reader to merge reader
+     */
+    unpackAllOverlappedCachedPageReadersToMergeReader(currentPageEndTime);
+  }
+
+  private void unpackAllOverlappedCachedPageReadersToMergeReader(long endTime) throws IOException {
+    while (!cachedPageReaders.isEmpty() && endTime >= cachedPageReaders.peek().data
+        .getStatistics().getStartTime()) {
+      putPageReaderToMergeReader(cachedPageReaders.poll());
     }
-    // unpack all overlapped unseq chunk meta data into overlapped page readers
-    while (!unseqChunkMetadatas.isEmpty()
-        && currentPageEndTime >= unseqChunkMetadatas.peek().getStartTime()) {
-      unpackOneChunkMetaData(unseqChunkMetadatas.poll());
-      tryToFillChunkMetadatas();
+    if (firstPageReader != null && endTime >= firstPageReader.data.getStatistics().getStartTime()) {
+      putPageReaderToMergeReader(firstPageReader);
+      firstPageReader = null;
     }
+  }
 
-    // put all page that directly overlapped with first page into merge reader
-    while (!overlappedPageReaders.isEmpty()
-        && currentPageEndTime >= overlappedPageReaders.peek().data.getStatistics().getStartTime()) {
-      VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
-      mergeReader.addReader(
-          pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
-          pageReader.data.getStatistics().getEndTime());
-    }
+  private void putPageReaderToMergeReader(VersionPair<IPageReader> pageReader) throws IOException {
+    mergeReader.addReader(
+        pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(),
+        pageReader.version, pageReader.data.getStatistics().getEndTime());
   }
 
   public BatchData nextOverlappedPage() throws IOException {
@@ -426,26 +424,66 @@ public class SeriesReader {
     return unseqTsFilesSet;
   }
 
+
   /**
+   *
+   * unpack all overlapped seq/unseq files and find the first chunk metadata
+   *
    * Because there may be too many files in the scenario used by the user, we cannot open all the
    * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
    * approach is likely to be ubiquitous, but it keeps the system running smoothly
    */
-  private void tryToFillChunkMetadatas() throws IOException {
-    // Fill sequence chunkMetadatas until it is not empty
+  private void tryToUnpackAllOverlappedFilesToChunkMetadatas() throws IOException {
+
+    /*
+     * Fill sequence chunkMetadatas until it is not empty
+     */
     while (seqChunkMetadatas.isEmpty() && !seqFileResource.isEmpty()) {
       seqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(seqFileResource.remove(0)));
     }
 
-    // Fill unsequence chunkMetadatas until there are no overlapped unseqFileResources
+    /*
+     * Fill unsequence chunkMetadatas until it is not empty
+     */
     while (unseqChunkMetadatas.isEmpty() && !unseqFileResource.isEmpty()) {
       unseqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(unseqFileResource.poll()));
     }
-    while (!unseqChunkMetadatas.isEmpty() && !unseqFileResource.isEmpty()
-            && unseqChunkMetadatas.peek().getEndTime() >=
-            unseqFileResource.peek().getStartTimeMap().get(seriesPath.getDevice())) {
+
+    /*
+     * find first chunk metadata
+     */
+    if (!seqChunkMetadatas.isEmpty() && unseqChunkMetadatas.isEmpty()) {
+      // only has seq
+      firstChunkMetaData = seqChunkMetadatas.remove(0);
+    } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) {
+      // only has unseq
+      firstChunkMetaData = unseqChunkMetadatas.poll();
+    } else if (!seqChunkMetadatas.isEmpty()) {
+      // has seq and unseq
+      if (seqChunkMetadatas.get(0).getStartTime() <= unseqChunkMetadatas.peek().getStartTime()) {
+        firstChunkMetaData = seqChunkMetadatas.remove(0);
+      } else {
+        firstChunkMetaData = unseqChunkMetadatas.poll();
+      }
+    }
+
+    /*
+     * unpack all directly overlapped seq/unseq files with first chunk metadata
+     */
+    if (firstChunkMetaData != null) {
+      unpackAllOverlappedTsFilesToChunkMetadatas(firstChunkMetaData.getEndTime());
+    }
+  }
+
+  private void unpackAllOverlappedTsFilesToChunkMetadatas(long endTime) throws IOException {
+    while (!unseqFileResource.isEmpty() && endTime >=
+        unseqFileResource.peek().getStartTimeMap().get(seriesPath.getDevice())) {
       unseqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(unseqFileResource.poll()));
     }
+    while (!seqFileResource.isEmpty() && endTime >=
+        seqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) {
+      seqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(seqFileResource.remove(0)));
+    }
   }
 
   public void setTimeFilter(long timestamp) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index 3c0a3e4..64f1642 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -190,7 +190,7 @@ public class BinaryStatistics extends Statistics<Binary> {
 
   @Override
   public String toString() {
-    return "[fistValue:" + firstValue + ",lastValue:" + lastValue + "]";
+    return super.toString() + " [fistValue:" + firstValue + ",lastValue:" + lastValue + "]";
   }
 
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
index 96d6c63..fa879c6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
@@ -186,6 +186,6 @@ public class BooleanStatistics extends Statistics<Boolean> {
 
   @Override
   public String toString() {
-    return "[firstValue:" + firstValue + ",lastValue:" + lastValue + "]";
+    return super.toString() + " [firstValue:" + firstValue + ",lastValue:" + lastValue + "]";
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
index 85ae01e..965419f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -215,7 +215,7 @@ public class DoubleStatistics extends Statistics<Double> {
 
   @Override
   public String toString() {
-    return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+    return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
         ",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
index 4daaf77..f7ad1f8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
@@ -209,7 +209,7 @@ public class FloatStatistics extends Statistics<Float> {
 
   @Override
   public String toString() {
-    return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+    return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
         ",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
index 3bf7a5d..f11d9da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
@@ -211,7 +211,7 @@ public class IntegerStatistics extends Statistics<Integer> {
 
   @Override
   public String toString() {
-    return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+    return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
         ",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
index 5741715..c270676 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
@@ -217,7 +217,7 @@ public class LongStatistics extends Statistics<Long> {
 
   @Override
   public String toString() {
-    return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+    return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
         ",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 4f4e6e8..244530b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -414,6 +414,11 @@ public abstract class Statistics<T> {
   }
 
   @Override
+  public String toString() {
+    return "startTime: " + startTime + " endTime: " + endTime;
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) {
       return true;