You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/26 09:12:39 UTC

[incubator-iotdb] branch fix_chunk_reader created (now c0c85cd)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_chunk_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at c0c85cd  optimize PageReader and ChunkReader

This branch includes the following new commits:

     new c0c85cd  optimize PageReader and ChunkReader

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: optimize PageReader and ChunkReader

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_chunk_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c0c85cd3bb60e186213373f64c2d6c9cee788d68
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Dec 26 17:12:13 2019 +0800

    optimize PageReader and ChunkReader
---
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |  16 +-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   7 +-
 .../reader/chunkRelated/CachedDiskChunkReader.java |  14 +-
 .../query/reader/chunkRelated/ChunkReaderWrap.java |   3 +-
 .../query/reader/chunkRelated/DiskChunkReader.java |  13 +-
 .../chunkRelated/DiskChunkReaderByTimestamp.java   |   8 +-
 .../CachedUnseqResourceMergeReader.java            |   3 +-
 .../NewUnseqResourceMergeReader.java               |  14 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |   7 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |   3 +-
 .../read/reader/chunk/AbstractChunkReader.java     | 163 ---------------------
 .../tsfile/read/reader/chunk/ChunkReader.java      | 135 ++++++++++++++++-
 .../read/reader/chunk/ChunkReaderByTimestamp.java  |   2 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  | 135 ++---------------
 .../reader/series/AbstractFileSeriesReader.java    |  10 +-
 .../reader/series/FileSeriesReaderByTimestamp.java |  20 +--
 .../iotdb/tsfile/read/reader/PageReaderTest.java   |   7 +-
 17 files changed, 200 insertions(+), 360 deletions(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 9eba9fb..ddf11d5 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -85,15 +85,13 @@ public class TsFileSequenceRead {
             System.out
                 .println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
             PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
-                defaultTimeDecoder);
-            while (reader1.hasNextBatch()) {
-              BatchData batchData = reader1.nextBatch();
-              while (batchData.hasCurrent()) {
-                System.out.println(
-                    "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
-                        .currentValue());
-                batchData.next();
-              }
+                defaultTimeDecoder, null);
+            BatchData batchData = reader1.getAllSatisfiedPageData();
+            while (batchData.hasCurrent()) {
+              System.out.println(
+                  "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
+                      .currentValue());
+              batchData.next();
             }
           }
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index cecf6ee..87f29c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -401,9 +400,9 @@ class MergeMultiChunkTask {
   private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
       long chunkLimitTime, int pathIdx) throws IOException {
     int cnt = 0;
-    AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
-    while (chunkReader.hasNextBatch()) {
-      BatchData batchData = chunkReader.nextBatch();
+    ChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      BatchData batchData = chunkReader.nextPageData();
       cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
     }
     cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
index bdca2e5..822815d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
@@ -24,16 +24,16 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 public class CachedDiskChunkReader implements IPointReader {
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private BatchData data;
   private TimeValuePair prev;
   private TimeValuePair current;
 
-  public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
+  public CachedDiskChunkReader(ChunkReader chunkReader) {
     this.chunkReader = chunkReader;
     this.prev =
         TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
@@ -44,8 +44,8 @@ public class CachedDiskChunkReader implements IPointReader {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    while (chunkReader.hasNextBatch()) {
-      data = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      data = chunkReader.nextPageData();
       if (data.hasCurrent()) {
         return true;
       }
@@ -60,8 +60,8 @@ public class CachedDiskChunkReader implements IPointReader {
     if (data.hasCurrent()) {
       TimeValuePairUtils.setCurrentTimeValuePair(data, current());
     } else {
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data.hasCurrent()) {
           TimeValuePairUtils.setCurrentTimeValuePair(data, current());
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
index 2bb2bb6..4dd57bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 
@@ -71,7 +70,7 @@ public class ChunkReaderWrap {
   public IPointReader getIPointReader() throws IOException {
     if (type.equals(ChunkReaderType.DISK_CHUNK)) {
       Chunk chunk = chunkLoader.getChunk(chunkMetaData);
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
+      ChunkReader chunkReader = new ChunkReader(chunk, filter);
       return new DiskChunkReader(chunkReader);
     } else {
       return new MemChunkReader(readOnlyMemChunk, filter);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
index 1e00642..240867e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
@@ -20,16 +20,15 @@ package org.apache.iotdb.db.query.reader.chunkRelated;
 
 import java.io.IOException;
 import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 /**
  * To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
- * data reader {@link AbstractChunkReader}.
+ * data reader {@link ChunkReader}.
  * <p>
  * Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
  * are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
@@ -37,10 +36,10 @@ import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
  */
 public class DiskChunkReader implements IPointReader, IBatchReader {
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private BatchData data;
 
-  public DiskChunkReader(AbstractChunkReader chunkReader) {
+  public DiskChunkReader(ChunkReader chunkReader) {
     this.chunkReader = chunkReader;
   }
 
@@ -49,8 +48,8 @@ public class DiskChunkReader implements IPointReader, IBatchReader {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    while (chunkReader.hasNextBatch()) {
-      data = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      data = chunkReader.nextPageData();
       if (data.hasCurrent()) {
         return true;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
index 81d9866..521a4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
@@ -54,8 +54,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
         return null;
       } else {
         chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
-        if (chunkReaderByTimestamp.hasNextBatch()) {
-          data = chunkReaderByTimestamp.nextBatch();
+        if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+          data = chunkReaderByTimestamp.nextPageData();
         } else {
           return null;
         }
@@ -70,8 +70,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
-      data = chunkReaderByTimestamp.nextBatch();
+    if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+      data = chunkReaderByTimestamp.nextPageData();
       return true;
     }
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
index ee0fdd8..33c3462 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader;
 import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
@@ -35,7 +34,7 @@ public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
     super(dataType);
     int priorityValue = 1;
     for (Chunk chunk : chunks) {
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
       addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index 61e8763..ce2d379 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -88,16 +88,18 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
       if (tsFileResource.isClosed()) {
         // get chunk metadata list of current closed tsfile
         currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
-        List<Modification> pathModifications = context
-            .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
-        if (!pathModifications.isEmpty()) {
-          QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
-        }
       } else {
-        // metadata list of already flushed chunk groups
+        // metadata list of already flushed chunks
         currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
       }
 
+      // get modifications and apply to chunk metadatas
+      List<Modification> pathModifications = context
+          .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
+      if (!pathModifications.isEmpty()) {
+        QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+      }
+
       if (!currentChunkMetaDataList.isEmpty()) {
         TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
             .get(tsFileResource, tsFileResource.isClosed());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index a0d1253..eb065d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
@@ -101,10 +100,10 @@ public class MergeUtils {
   }
 
   public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
-    AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+    ChunkReader chunkReader = new ChunkReader(chunk, null);
     int ptWritten = 0;
-    while (chunkReader.hasNextBatch()) {
-      BatchData batchData = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      BatchData batchData = chunkReader.nextPageData();
       for (int i = 0; i < batchData.length(); i++) {
         writeBatchPoint(batchData, i, chunkWriter);
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 45fa54c..8b88e8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -165,7 +164,7 @@ public class UnseqTsFileRecoverTest {
     int priorityValue = 1;
     for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
       Chunk chunk = chunkLoader.getChunk(chunkMetaData);
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
       unSeqMergeReader
           .addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
       priorityValue++;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
deleted file mode 100644
index 1758ec7..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.read.reader.chunk;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.compress.IUnCompressor;
-import org.apache.iotdb.tsfile.encoding.common.EndianType;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-
-public abstract class AbstractChunkReader implements IBatchReader {
-
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkDataBuffer;
-
-  private IUnCompressor unCompressor;
-  private Decoder valueDecoder;
-  private Decoder timeDecoder = Decoder.getDecoderByType(
-      TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-      TSDataType.INT64);
-
-  protected Filter filter;
-
-  private BatchData data;
-
-  private PageHeader pageHeader;
-  private boolean hasCachedPageHeader;
-
-  /**
-   * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
-   */
-  protected long deletedAt;
-
-  /**
-   * constructor of ChunkReader.
-   *
-   * @param chunk input Chunk object
-   * @param filter filter
-   */
-  public AbstractChunkReader(Chunk chunk, Filter filter) {
-    this.filter = filter;
-    this.chunkDataBuffer = chunk.getData();
-    this.deletedAt = chunk.getDeletedAt();
-    EndianType endianType = chunk.getEndianType();
-    chunkHeader = chunk.getHeader();
-    this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
-    valueDecoder = Decoder
-        .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
-    valueDecoder.setEndianType(endianType);
-    data = new BatchData(chunkHeader.getDataType());
-    hasCachedPageHeader = false;
-  }
-
-  /**
-   * judge if has nextBatch.
-   */
-  public boolean hasNextBatch() {
-    if (hasCachedPageHeader) {
-      return true;
-    }
-    // construct next satisfied page header
-    while (chunkDataBuffer.remaining() > 0) {
-      // deserialize a PageHeader from chunkDataBuffer
-      pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
-
-      // if the current page satisfies
-      if (pageSatisfied(pageHeader)) {
-        hasCachedPageHeader = true;
-        return true;
-      } else {
-        skipBytesInStreamByLength(pageHeader.getCompressedSize());
-      }
-    }
-    return false;
-  }
-
-  /**
-   * get next data batch.
-   *
-   * @return next data batch
-   * @throws IOException IOException
-   */
-  public BatchData nextBatch() throws IOException {
-    PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
-    hasCachedPageHeader = false;
-    if (pageReader.hasNextBatch()) {
-      data = pageReader.nextBatch();
-      return data;
-    }
-    return data;
-  }
-
-
-  public PageHeader nextPageHeader() {
-    return pageHeader;
-  }
-
-  public void skipPageData() {
-    skipBytesInStreamByLength(pageHeader.getCompressedSize());
-    hasCachedPageHeader = false;
-  }
-
-  private void skipBytesInStreamByLength(long length) {
-    chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
-  }
-
-  public abstract boolean pageSatisfied(PageHeader pageHeader);
-
-  private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
-      throws IOException {
-    byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
-    // already in memory
-    if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
-      throw new IOException(
-          "unexpected byte read length when read compressedPageBody. Expected:"
-              + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
-              .remaining());
-    }
-    chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
-    valueDecoder.reset();
-    ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
-    PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
-        valueDecoder, timeDecoder, filter);
-    reader.setDeletedAt(deletedAt);
-    return reader;
-  }
-
-  public void close() {
-  }
-
-  public ChunkHeader getChunkHeader() {
-    return chunkHeader;
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index cf831fc..1d8abdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -16,28 +16,147 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.tsfile.read.reader.chunk;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.common.EndianType;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+public class ChunkReader {
+
+  private ChunkHeader chunkHeader;
+  private ByteBuffer chunkDataBuffer;
+
+  private IUnCompressor unCompressor;
+  private Decoder valueDecoder;
+  private Decoder timeDecoder = Decoder.getDecoderByType(
+      TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+      TSDataType.INT64);
 
-public class ChunkReader extends AbstractChunkReader {
+  protected Filter filter;
 
+  private PageHeader pageHeader;
+  private boolean hasCachedPageHeader;
+
+  /**
+   * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+   */
+  protected long deletedAt;
+
+  /**
+   * constructor of ChunkReader.
+   *
+   * @param chunk input Chunk object
+   * @param filter filter
+   */
   public ChunkReader(Chunk chunk, Filter filter) {
-    super(chunk, filter);
+    this.filter = filter;
+    this.chunkDataBuffer = chunk.getData();
+    this.deletedAt = chunk.getDeletedAt();
+    EndianType endianType = chunk.getEndianType();
+    chunkHeader = chunk.getHeader();
+    this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+    valueDecoder = Decoder
+        .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
+    valueDecoder.setEndianType(endianType);
+    hasCachedPageHeader = false;
+  }
+
+  /**
+   * judge if has next page whose page header satisfies the filter.
+   */
+  public boolean hasNextSatisfiedPage() {
+    if (hasCachedPageHeader) {
+      return true;
+    }
+    // construct next satisfied page header
+    while (chunkDataBuffer.remaining() > 0) {
+      // deserialize a PageHeader from chunkDataBuffer
+      pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+
+      // if the current page satisfies
+      if (pageSatisfied(pageHeader)) {
+        hasCachedPageHeader = true;
+        return true;
+      } else {
+        skipBytesInStreamByLength(pageHeader.getCompressedSize());
+      }
+    }
+    return false;
+  }
+
+  /**
+   * get next data batch.
+   *
+   * @return next data batch
+   * @throws IOException IOException
+   */
+  public BatchData nextPageData() throws IOException {
+    if(hasCachedPageHeader || hasNextSatisfiedPage()) {
+      PageReader pageReader = constructPageReaderForNextPage(pageHeader);
+      hasCachedPageHeader = false;
+      return pageReader.getAllSatisfiedPageData();
+    } else {
+      throw new IOException("no next page data");
+    }
+  }
+
+  public PageHeader nextPageHeader() {
+    return pageHeader;
+  }
+
+  public void skipPageData() {
+    skipBytesInStreamByLength(pageHeader.getCompressedSize());
+    hasCachedPageHeader = false;
+  }
+
+  private void skipBytesInStreamByLength(long length) {
+    chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
   }
 
-  @Override
   public boolean pageSatisfied(PageHeader pageHeader) {
-    if (pageHeader.getEndTime() < deletedAt) {
+    if (pageHeader.getEndTime() <= deletedAt) {
       return false;
     }
-    if (filter == null ) {
-      return true;
-    } else {
-      return filter.satisfy(pageHeader.getStatistics());
+    return filter == null || filter.satisfy(pageHeader.getStatistics());
+  }
+
+  private PageReader constructPageReaderForNextPage(PageHeader pageHeader)
+      throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+    // doesn't has a complete page body
+    if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+      throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+              + ". Actual:" + chunkDataBuffer.remaining());
     }
+
+    chunkDataBuffer.get(compressedPageBody);
+    valueDecoder.reset();
+    ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
+    PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
+        valueDecoder, timeDecoder, filter);
+    reader.setDeletedAt(deletedAt);
+    return reader;
   }
 
+  public void close() {
+  }
+
+  public ChunkHeader getChunkHeader() {
+    return chunkHeader;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 1630a63..1f487d0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 
-public class ChunkReaderByTimestamp extends AbstractChunkReader {
+public class ChunkReaderByTimestamp extends ChunkReader {
 
   private long currentTimestamp;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 7d113e9..2e43f77 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 
@@ -45,32 +47,24 @@ public class PageReader {
   /** value column in memory */
   private ByteBuffer valueBuffer;
 
-  private BatchData data = null;
-
-  private Filter filter = null;
+  private Filter filter;
 
+  /** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */
   private long deletedAt = Long.MIN_VALUE;
 
   public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
-      Decoder timeDecoder,
-      Filter filter) {
-    this(pageData, dataType, valueDecoder, timeDecoder);
-    this.filter = filter;
-  }
-
-  public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
-      Decoder timeDecoder) {
+      Decoder timeDecoder, Filter filter) {
     this.dataType = dataType;
     this.valueDecoder = valueDecoder;
     this.timeDecoder = timeDecoder;
+    this.filter = filter;
     splitDataToTimeStampAndValue(pageData);
   }
 
   /**
    * split pageContent into two stream: time and value
    *
-   * @param pageData
-   *            uncompressed bytes size of time column, time column, value column
+   * @param pageData uncompressed bytes size of time column, time column, value column
    */
   private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
     int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
@@ -82,28 +76,7 @@ public class PageReader {
     valueBuffer.position(timeBufferLength);
   }
 
-  public boolean hasNextBatch() throws IOException {
-    return timeDecoder.hasNext(timeBuffer);
-  }
-
-  /**
-   * may return an empty BatchData
-   */
-  public BatchData nextBatch() throws IOException {
-    if (filter == null) {
-      data = getAllPageData();
-    } else {
-      data = getAllPageDataWithFilter();
-    }
-
-    return data;
-  }
-
-  public BatchData currentBatch() {
-    return data;
-  }
-
-  private BatchData getAllPageData() throws IOException {
+  public BatchData getAllSatisfiedPageData() throws IOException {
 
     BatchData pageData = new BatchData(dataType, true);
 
@@ -113,42 +86,42 @@ public class PageReader {
       switch (dataType) {
         case BOOLEAN:
           boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
             pageData.putTime(timestamp);
             pageData.putBoolean(aBoolean);
           }
           break;
         case INT32:
           int anInt = valueDecoder.readInt(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
             pageData.putTime(timestamp);
             pageData.putInt(anInt);
           }
           break;
         case INT64:
           long aLong = valueDecoder.readLong(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
             pageData.putTime(timestamp);
             pageData.putLong(aLong);
           }
           break;
         case FLOAT:
           float aFloat = valueDecoder.readFloat(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
             pageData.putTime(timestamp);
             pageData.putFloat(aFloat);
           }
           break;
         case DOUBLE:
           double aDouble = valueDecoder.readDouble(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
             pageData.putTime(timestamp);
             pageData.putDouble(aDouble);
           }
           break;
         case TEXT:
           Binary aBinary = valueDecoder.readBinary(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
             pageData.putTime(timestamp);
             pageData.putBinary(aBinary);
           }
@@ -160,86 +133,6 @@ public class PageReader {
     return pageData;
   }
 
-  private BatchData getAllPageDataWithFilter() throws IOException {
-    BatchData pageData = new BatchData(dataType, true);
-
-    while (timeDecoder.hasNext(timeBuffer)) {
-      long timestamp = timeDecoder.readLong(timeBuffer);
-
-      switch (dataType) {
-        case BOOLEAN:
-          readBoolean(pageData, timestamp);
-          break;
-        case INT32:
-          readInt(pageData, timestamp);
-          break;
-        case INT64:
-          readLong(pageData, timestamp);
-          break;
-        case FLOAT:
-          readFloat(pageData, timestamp);
-          break;
-        case DOUBLE:
-          readDouble(pageData, timestamp);
-          break;
-        case TEXT:
-          readText(pageData, timestamp);
-          break;
-        default:
-          throw new UnSupportedDataTypeException(String.valueOf(dataType));
-      }
-    }
-
-    return pageData;
-  }
-
-  private void readBoolean(BatchData pageData, long timestamp) {
-    boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
-      pageData.putTime(timestamp);
-      pageData.putBoolean(aBoolean);
-    }
-  }
-
-  private void readInt(BatchData pageData, long timestamp) {
-    int anInt = valueDecoder.readInt(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
-      pageData.putTime(timestamp);
-      pageData.putInt(anInt);
-    }
-  }
-
-  private void readLong(BatchData pageData, long timestamp) {
-    long aLong = valueDecoder.readLong(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
-      pageData.putTime(timestamp);
-      pageData.putLong(aLong);
-    }
-  }
-
-  private void readFloat(BatchData pageData, long timestamp) {
-    float aFloat = valueDecoder.readFloat(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
-      pageData.putTime(timestamp);
-      pageData.putFloat(aFloat);
-    }
-  }
-
-  private void readDouble(BatchData pageData, long timestamp) {
-    double aDouble = valueDecoder.readDouble(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
-      pageData.putTime(timestamp);
-      pageData.putDouble(aDouble);
-    }
-  }
-
-  private void readText(BatchData pageData, long timestamp) {
-    Binary aBinary = valueDecoder.readBinary(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
-      pageData.putTime(timestamp);
-      pageData.putBinary(aBinary);
-    }
-  }
 
   public void close() {
     timeBuffer = null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
index 069c435..14bb4be 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 import java.io.IOException;
 import java.util.List;
@@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
 
   protected IChunkLoader chunkLoader;
   protected List<ChunkMetaData> chunkMetaDataList;
-  protected AbstractChunkReader chunkReader;
+  protected ChunkReader chunkReader;
   private int chunkToRead;
 
   private BatchData data;
@@ -63,7 +63,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
   public boolean hasNextBatch() throws IOException {
 
     // current chunk has additional batch
-    if (chunkReader != null && chunkReader.hasNextBatch()) {
+    if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
       return true;
     }
 
@@ -75,7 +75,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
         // chunk metadata satisfy the condition
         initChunkReader(chunkMetaData);
 
-        if (chunkReader.hasNextBatch()) {
+        if (chunkReader.hasNextSatisfiedPage()) {
           return true;
         }
       }
@@ -87,7 +87,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
    * get next batch data.
    */
   public BatchData nextBatch() throws IOException {
-    data = chunkReader.nextBatch();
+    data = chunkReader.nextPageData();
     return data;
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
index 13be71b..1843229 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 
 /**
@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
   protected List<ChunkMetaData> chunkMetaDataList;
   private int currentChunkIndex = 0;
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private long currentTimestamp;
   private BatchData data = null; // current batch data
 
@@ -69,8 +69,8 @@ public class FileSeriesReaderByTimestamp {
         return null;
       }
 
-      if (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      if (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
       } else {
         return null;
       }
@@ -93,8 +93,8 @@ public class FileSeriesReaderByTimestamp {
         }
         return null;
       } else {
-        if (chunkReader.hasNextBatch()) {
-          data = chunkReader.nextBatch();
+        if (chunkReader.hasNextSatisfiedPage()) {
+          data = chunkReader.nextPageData();
         } else if (!constructNextSatisfiedChunkReader()) {
           return null;
         }
@@ -115,16 +115,16 @@ public class FileSeriesReaderByTimestamp {
       if (data != null && data.hasCurrent()) {
         return true;
       }
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data != null && data.hasCurrent()) {
           return true;
         }
       }
     }
     while (constructNextSatisfiedChunkReader()) {
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data != null && data.hasCurrent()) {
           return true;
         }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
index bcce85c..6a55bd1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
@@ -171,14 +171,11 @@ public class PageReaderTest {
         ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
 
         PageReader pageReader = new PageReader(page, dataType, decoder,
-            new DeltaBinaryDecoder.LongDeltaDecoder());
+            new DeltaBinaryDecoder.LongDeltaDecoder(), null);
 
         int index = 0;
         long startTimestamp = System.currentTimeMillis();
-        BatchData data = null;
-        if (pageReader.hasNextBatch()) {
-          data = pageReader.nextBatch();
-        }
+        BatchData data = pageReader.getAllSatisfiedPageData();
         assert data != null;
 
         while (data.hasCurrent()) {