You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/12/26 09:12:40 UTC

[incubator-iotdb] 01/01: optimize PageReader and ChunkReader

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_chunk_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c0c85cd3bb60e186213373f64c2d6c9cee788d68
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Dec 26 17:12:13 2019 +0800

    optimize PageReader and ChunkReader
---
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |  16 +-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   7 +-
 .../reader/chunkRelated/CachedDiskChunkReader.java |  14 +-
 .../query/reader/chunkRelated/ChunkReaderWrap.java |   3 +-
 .../query/reader/chunkRelated/DiskChunkReader.java |  13 +-
 .../chunkRelated/DiskChunkReaderByTimestamp.java   |   8 +-
 .../CachedUnseqResourceMergeReader.java            |   3 +-
 .../NewUnseqResourceMergeReader.java               |  14 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |   7 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |   3 +-
 .../read/reader/chunk/AbstractChunkReader.java     | 163 ---------------------
 .../tsfile/read/reader/chunk/ChunkReader.java      | 135 ++++++++++++++++-
 .../read/reader/chunk/ChunkReaderByTimestamp.java  |   2 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  | 135 ++---------------
 .../reader/series/AbstractFileSeriesReader.java    |  10 +-
 .../reader/series/FileSeriesReaderByTimestamp.java |  20 +--
 .../iotdb/tsfile/read/reader/PageReaderTest.java   |   7 +-
 17 files changed, 200 insertions(+), 360 deletions(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 9eba9fb..ddf11d5 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -85,15 +85,13 @@ public class TsFileSequenceRead {
             System.out
                 .println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
             PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
-                defaultTimeDecoder);
-            while (reader1.hasNextBatch()) {
-              BatchData batchData = reader1.nextBatch();
-              while (batchData.hasCurrent()) {
-                System.out.println(
-                    "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
-                        .currentValue());
-                batchData.next();
-              }
+                defaultTimeDecoder, null);
+            BatchData batchData = reader1.getAllSatisfiedPageData();
+            while (batchData.hasCurrent()) {
+              System.out.println(
+                  "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
+                      .currentValue());
+              batchData.next();
             }
           }
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index cecf6ee..87f29c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -401,9 +400,9 @@ class MergeMultiChunkTask {
   private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
       long chunkLimitTime, int pathIdx) throws IOException {
     int cnt = 0;
-    AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
-    while (chunkReader.hasNextBatch()) {
-      BatchData batchData = chunkReader.nextBatch();
+    ChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      BatchData batchData = chunkReader.nextPageData();
       cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
     }
     cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
index bdca2e5..822815d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
@@ -24,16 +24,16 @@ import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 public class CachedDiskChunkReader implements IPointReader {
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private BatchData data;
   private TimeValuePair prev;
   private TimeValuePair current;
 
-  public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
+  public CachedDiskChunkReader(ChunkReader chunkReader) {
     this.chunkReader = chunkReader;
     this.prev =
         TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
@@ -44,8 +44,8 @@ public class CachedDiskChunkReader implements IPointReader {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    while (chunkReader.hasNextBatch()) {
-      data = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      data = chunkReader.nextPageData();
       if (data.hasCurrent()) {
         return true;
       }
@@ -60,8 +60,8 @@ public class CachedDiskChunkReader implements IPointReader {
     if (data.hasCurrent()) {
       TimeValuePairUtils.setCurrentTimeValuePair(data, current());
     } else {
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data.hasCurrent()) {
           TimeValuePairUtils.setCurrentTimeValuePair(data, current());
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
index 2bb2bb6..4dd57bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 
@@ -71,7 +70,7 @@ public class ChunkReaderWrap {
   public IPointReader getIPointReader() throws IOException {
     if (type.equals(ChunkReaderType.DISK_CHUNK)) {
       Chunk chunk = chunkLoader.getChunk(chunkMetaData);
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
+      ChunkReader chunkReader = new ChunkReader(chunk, filter);
       return new DiskChunkReader(chunkReader);
     } else {
       return new MemChunkReader(readOnlyMemChunk, filter);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
index 1e00642..240867e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
@@ -20,16 +20,15 @@ package org.apache.iotdb.db.query.reader.chunkRelated;
 
 import java.io.IOException;
 import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 /**
  * To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
- * data reader {@link AbstractChunkReader}.
+ * data reader {@link ChunkReader}.
  * <p>
  * Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
  * are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
@@ -37,10 +36,10 @@ import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
  */
 public class DiskChunkReader implements IPointReader, IBatchReader {
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private BatchData data;
 
-  public DiskChunkReader(AbstractChunkReader chunkReader) {
+  public DiskChunkReader(ChunkReader chunkReader) {
     this.chunkReader = chunkReader;
   }
 
@@ -49,8 +48,8 @@ public class DiskChunkReader implements IPointReader, IBatchReader {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    while (chunkReader.hasNextBatch()) {
-      data = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      data = chunkReader.nextPageData();
       if (data.hasCurrent()) {
         return true;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
index 81d9866..521a4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
@@ -54,8 +54,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
         return null;
       } else {
         chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
-        if (chunkReaderByTimestamp.hasNextBatch()) {
-          data = chunkReaderByTimestamp.nextBatch();
+        if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+          data = chunkReaderByTimestamp.nextPageData();
         } else {
           return null;
         }
@@ -70,8 +70,8 @@ public class DiskChunkReaderByTimestamp implements IReaderByTimestamp {
     if (data != null && data.hasCurrent()) {
       return true;
     }
-    if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
-      data = chunkReaderByTimestamp.nextBatch();
+    if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+      data = chunkReaderByTimestamp.nextPageData();
       return true;
     }
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
index ee0fdd8..33c3462 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader;
 import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
@@ -35,7 +34,7 @@ public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
     super(dataType);
     int priorityValue = 1;
     for (Chunk chunk : chunks) {
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
       addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index 61e8763..ce2d379 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -88,16 +88,18 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
       if (tsFileResource.isClosed()) {
         // get chunk metadata list of current closed tsfile
         currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
-        List<Modification> pathModifications = context
-            .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
-        if (!pathModifications.isEmpty()) {
-          QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
-        }
       } else {
-        // metadata list of already flushed chunk groups
+        // metadata list of already flushed chunks
         currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
       }
 
+      // get modifications and apply to chunk metadatas
+      List<Modification> pathModifications = context
+          .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
+      if (!pathModifications.isEmpty()) {
+        QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+      }
+
       if (!currentChunkMetaDataList.isEmpty()) {
         TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
             .get(tsFileResource, tsFileResource.isClosed());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index a0d1253..eb065d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
@@ -101,10 +100,10 @@ public class MergeUtils {
   }
 
   public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
-    AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+    ChunkReader chunkReader = new ChunkReader(chunk, null);
     int ptWritten = 0;
-    while (chunkReader.hasNextBatch()) {
-      BatchData batchData = chunkReader.nextBatch();
+    while (chunkReader.hasNextSatisfiedPage()) {
+      BatchData batchData = chunkReader.nextPageData();
       for (int i = 0; i < batchData.length(); i++) {
         writeBatchPoint(batchData, i, chunkWriter);
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 45fa54c..8b88e8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -165,7 +164,7 @@ public class UnseqTsFileRecoverTest {
     int priorityValue = 1;
     for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
       Chunk chunk = chunkLoader.getChunk(chunkMetaData);
-      AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+      ChunkReader chunkReader = new ChunkReader(chunk, null);
       unSeqMergeReader
           .addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
       priorityValue++;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
deleted file mode 100644
index 1758ec7..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.read.reader.chunk;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.compress.IUnCompressor;
-import org.apache.iotdb.tsfile.encoding.common.EndianType;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-
-public abstract class AbstractChunkReader implements IBatchReader {
-
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkDataBuffer;
-
-  private IUnCompressor unCompressor;
-  private Decoder valueDecoder;
-  private Decoder timeDecoder = Decoder.getDecoderByType(
-      TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-      TSDataType.INT64);
-
-  protected Filter filter;
-
-  private BatchData data;
-
-  private PageHeader pageHeader;
-  private boolean hasCachedPageHeader;
-
-  /**
-   * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
-   */
-  protected long deletedAt;
-
-  /**
-   * constructor of ChunkReader.
-   *
-   * @param chunk input Chunk object
-   * @param filter filter
-   */
-  public AbstractChunkReader(Chunk chunk, Filter filter) {
-    this.filter = filter;
-    this.chunkDataBuffer = chunk.getData();
-    this.deletedAt = chunk.getDeletedAt();
-    EndianType endianType = chunk.getEndianType();
-    chunkHeader = chunk.getHeader();
-    this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
-    valueDecoder = Decoder
-        .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
-    valueDecoder.setEndianType(endianType);
-    data = new BatchData(chunkHeader.getDataType());
-    hasCachedPageHeader = false;
-  }
-
-  /**
-   * judge if has nextBatch.
-   */
-  public boolean hasNextBatch() {
-    if (hasCachedPageHeader) {
-      return true;
-    }
-    // construct next satisfied page header
-    while (chunkDataBuffer.remaining() > 0) {
-      // deserialize a PageHeader from chunkDataBuffer
-      pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
-
-      // if the current page satisfies
-      if (pageSatisfied(pageHeader)) {
-        hasCachedPageHeader = true;
-        return true;
-      } else {
-        skipBytesInStreamByLength(pageHeader.getCompressedSize());
-      }
-    }
-    return false;
-  }
-
-  /**
-   * get next data batch.
-   *
-   * @return next data batch
-   * @throws IOException IOException
-   */
-  public BatchData nextBatch() throws IOException {
-    PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
-    hasCachedPageHeader = false;
-    if (pageReader.hasNextBatch()) {
-      data = pageReader.nextBatch();
-      return data;
-    }
-    return data;
-  }
-
-
-  public PageHeader nextPageHeader() {
-    return pageHeader;
-  }
-
-  public void skipPageData() {
-    skipBytesInStreamByLength(pageHeader.getCompressedSize());
-    hasCachedPageHeader = false;
-  }
-
-  private void skipBytesInStreamByLength(long length) {
-    chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
-  }
-
-  public abstract boolean pageSatisfied(PageHeader pageHeader);
-
-  private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
-      throws IOException {
-    byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
-    // already in memory
-    if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
-      throw new IOException(
-          "unexpected byte read length when read compressedPageBody. Expected:"
-              + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
-              .remaining());
-    }
-    chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
-    valueDecoder.reset();
-    ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
-    PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
-        valueDecoder, timeDecoder, filter);
-    reader.setDeletedAt(deletedAt);
-    return reader;
-  }
-
-  public void close() {
-  }
-
-  public ChunkHeader getChunkHeader() {
-    return chunkHeader;
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index cf831fc..1d8abdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -16,28 +16,147 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.tsfile.read.reader.chunk;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.common.EndianType;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+public class ChunkReader {
+
+  private ChunkHeader chunkHeader;
+  private ByteBuffer chunkDataBuffer;
+
+  private IUnCompressor unCompressor;
+  private Decoder valueDecoder;
+  private Decoder timeDecoder = Decoder.getDecoderByType(
+      TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+      TSDataType.INT64);
 
-public class ChunkReader extends AbstractChunkReader {
+  protected Filter filter;
 
+  private PageHeader pageHeader;
+  private boolean hasCachedPageHeader;
+
+  /**
+   * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+   */
+  protected long deletedAt;
+
+  /**
+   * constructor of ChunkReader.
+   *
+   * @param chunk input Chunk object
+   * @param filter filter
+   */
   public ChunkReader(Chunk chunk, Filter filter) {
-    super(chunk, filter);
+    this.filter = filter;
+    this.chunkDataBuffer = chunk.getData();
+    this.deletedAt = chunk.getDeletedAt();
+    EndianType endianType = chunk.getEndianType();
+    chunkHeader = chunk.getHeader();
+    this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+    valueDecoder = Decoder
+        .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
+    valueDecoder.setEndianType(endianType);
+    hasCachedPageHeader = false;
+  }
+
+  /**
+   * judge if has next page whose page header satisfies the filter.
+   */
+  public boolean hasNextSatisfiedPage() {
+    if (hasCachedPageHeader) {
+      return true;
+    }
+    // construct next satisfied page header
+    while (chunkDataBuffer.remaining() > 0) {
+      // deserialize a PageHeader from chunkDataBuffer
+      pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+
+      // if the current page satisfies
+      if (pageSatisfied(pageHeader)) {
+        hasCachedPageHeader = true;
+        return true;
+      } else {
+        skipBytesInStreamByLength(pageHeader.getCompressedSize());
+      }
+    }
+    return false;
+  }
+
+  /**
+   * get next data batch.
+   *
+   * @return next data batch
+   * @throws IOException IOException
+   */
+  public BatchData nextPageData() throws IOException {
+    if(hasCachedPageHeader || hasNextSatisfiedPage()) {
+      PageReader pageReader = constructPageReaderForNextPage(pageHeader);
+      hasCachedPageHeader = false;
+      return pageReader.getAllSatisfiedPageData();
+    } else {
+      throw new IOException("no next page data");
+    }
+  }
+
+  public PageHeader nextPageHeader() {
+    return pageHeader;
+  }
+
+  public void skipPageData() {
+    skipBytesInStreamByLength(pageHeader.getCompressedSize());
+    hasCachedPageHeader = false;
+  }
+
+  private void skipBytesInStreamByLength(long length) {
+    chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
   }
 
-  @Override
   public boolean pageSatisfied(PageHeader pageHeader) {
-    if (pageHeader.getEndTime() < deletedAt) {
+    if (pageHeader.getEndTime() <= deletedAt) {
       return false;
     }
-    if (filter == null ) {
-      return true;
-    } else {
-      return filter.satisfy(pageHeader.getStatistics());
+    return filter == null || filter.satisfy(pageHeader.getStatistics());
+  }
+
+  private PageReader constructPageReaderForNextPage(PageHeader pageHeader)
+      throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+    // doesn't has a complete page body
+    if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+      throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+              + ". Actual:" + chunkDataBuffer.remaining());
     }
+
+    chunkDataBuffer.get(compressedPageBody);
+    valueDecoder.reset();
+    ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
+    PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
+        valueDecoder, timeDecoder, filter);
+    reader.setDeletedAt(deletedAt);
+    return reader;
   }
 
+  public void close() {
+  }
+
+  public ChunkHeader getChunkHeader() {
+    return chunkHeader;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 1630a63..1f487d0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.read.reader.chunk;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 
-public class ChunkReaderByTimestamp extends AbstractChunkReader {
+public class ChunkReaderByTimestamp extends ChunkReader {
 
   private long currentTimestamp;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 7d113e9..2e43f77 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 
@@ -45,32 +47,24 @@ public class PageReader {
   /** value column in memory */
   private ByteBuffer valueBuffer;
 
-  private BatchData data = null;
-
-  private Filter filter = null;
+  private Filter filter;
 
+  /** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */
   private long deletedAt = Long.MIN_VALUE;
 
   public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
-      Decoder timeDecoder,
-      Filter filter) {
-    this(pageData, dataType, valueDecoder, timeDecoder);
-    this.filter = filter;
-  }
-
-  public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
-      Decoder timeDecoder) {
+      Decoder timeDecoder, Filter filter) {
     this.dataType = dataType;
     this.valueDecoder = valueDecoder;
     this.timeDecoder = timeDecoder;
+    this.filter = filter;
     splitDataToTimeStampAndValue(pageData);
   }
 
   /**
    * split pageContent into two stream: time and value
    *
-   * @param pageData
-   *            uncompressed bytes size of time column, time column, value column
+   * @param pageData uncompressed bytes size of time column, time column, value column
    */
   private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
     int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
@@ -82,28 +76,7 @@ public class PageReader {
     valueBuffer.position(timeBufferLength);
   }
 
-  public boolean hasNextBatch() throws IOException {
-    return timeDecoder.hasNext(timeBuffer);
-  }
-
-  /**
-   * may return an empty BatchData
-   */
-  public BatchData nextBatch() throws IOException {
-    if (filter == null) {
-      data = getAllPageData();
-    } else {
-      data = getAllPageDataWithFilter();
-    }
-
-    return data;
-  }
-
-  public BatchData currentBatch() {
-    return data;
-  }
-
-  private BatchData getAllPageData() throws IOException {
+  public BatchData getAllSatisfiedPageData() throws IOException {
 
     BatchData pageData = new BatchData(dataType, true);
 
@@ -113,42 +86,42 @@ public class PageReader {
       switch (dataType) {
         case BOOLEAN:
           boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
             pageData.putTime(timestamp);
             pageData.putBoolean(aBoolean);
           }
           break;
         case INT32:
           int anInt = valueDecoder.readInt(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
             pageData.putTime(timestamp);
             pageData.putInt(anInt);
           }
           break;
         case INT64:
           long aLong = valueDecoder.readLong(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
             pageData.putTime(timestamp);
             pageData.putLong(aLong);
           }
           break;
         case FLOAT:
           float aFloat = valueDecoder.readFloat(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
             pageData.putTime(timestamp);
             pageData.putFloat(aFloat);
           }
           break;
         case DOUBLE:
           double aDouble = valueDecoder.readDouble(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
             pageData.putTime(timestamp);
             pageData.putDouble(aDouble);
           }
           break;
         case TEXT:
           Binary aBinary = valueDecoder.readBinary(valueBuffer);
-          if (timestamp > deletedAt) {
+          if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
             pageData.putTime(timestamp);
             pageData.putBinary(aBinary);
           }
@@ -160,86 +133,6 @@ public class PageReader {
     return pageData;
   }
 
-  private BatchData getAllPageDataWithFilter() throws IOException {
-    BatchData pageData = new BatchData(dataType, true);
-
-    while (timeDecoder.hasNext(timeBuffer)) {
-      long timestamp = timeDecoder.readLong(timeBuffer);
-
-      switch (dataType) {
-        case BOOLEAN:
-          readBoolean(pageData, timestamp);
-          break;
-        case INT32:
-          readInt(pageData, timestamp);
-          break;
-        case INT64:
-          readLong(pageData, timestamp);
-          break;
-        case FLOAT:
-          readFloat(pageData, timestamp);
-          break;
-        case DOUBLE:
-          readDouble(pageData, timestamp);
-          break;
-        case TEXT:
-          readText(pageData, timestamp);
-          break;
-        default:
-          throw new UnSupportedDataTypeException(String.valueOf(dataType));
-      }
-    }
-
-    return pageData;
-  }
-
-  private void readBoolean(BatchData pageData, long timestamp) {
-    boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
-      pageData.putTime(timestamp);
-      pageData.putBoolean(aBoolean);
-    }
-  }
-
-  private void readInt(BatchData pageData, long timestamp) {
-    int anInt = valueDecoder.readInt(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
-      pageData.putTime(timestamp);
-      pageData.putInt(anInt);
-    }
-  }
-
-  private void readLong(BatchData pageData, long timestamp) {
-    long aLong = valueDecoder.readLong(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
-      pageData.putTime(timestamp);
-      pageData.putLong(aLong);
-    }
-  }
-
-  private void readFloat(BatchData pageData, long timestamp) {
-    float aFloat = valueDecoder.readFloat(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
-      pageData.putTime(timestamp);
-      pageData.putFloat(aFloat);
-    }
-  }
-
-  private void readDouble(BatchData pageData, long timestamp) {
-    double aDouble = valueDecoder.readDouble(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
-      pageData.putTime(timestamp);
-      pageData.putDouble(aDouble);
-    }
-  }
-
-  private void readText(BatchData pageData, long timestamp) {
-    Binary aBinary = valueDecoder.readBinary(valueBuffer);
-    if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
-      pageData.putTime(timestamp);
-      pageData.putBinary(aBinary);
-    }
-  }
 
   public void close() {
     timeBuffer = null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
index 069c435..14bb4be 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 
 import java.io.IOException;
 import java.util.List;
@@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
 
   protected IChunkLoader chunkLoader;
   protected List<ChunkMetaData> chunkMetaDataList;
-  protected AbstractChunkReader chunkReader;
+  protected ChunkReader chunkReader;
   private int chunkToRead;
 
   private BatchData data;
@@ -63,7 +63,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
   public boolean hasNextBatch() throws IOException {
 
     // current chunk has additional batch
-    if (chunkReader != null && chunkReader.hasNextBatch()) {
+    if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
       return true;
     }
 
@@ -75,7 +75,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
         // chunk metadata satisfy the condition
         initChunkReader(chunkMetaData);
 
-        if (chunkReader.hasNextBatch()) {
+        if (chunkReader.hasNextSatisfiedPage()) {
           return true;
         }
       }
@@ -87,7 +87,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
    * get next batch data.
    */
   public BatchData nextBatch() throws IOException {
-    data = chunkReader.nextBatch();
+    data = chunkReader.nextPageData();
     return data;
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
index 13be71b..1843229 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 
 /**
@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
   protected List<ChunkMetaData> chunkMetaDataList;
   private int currentChunkIndex = 0;
 
-  private AbstractChunkReader chunkReader;
+  private ChunkReader chunkReader;
   private long currentTimestamp;
   private BatchData data = null; // current batch data
 
@@ -69,8 +69,8 @@ public class FileSeriesReaderByTimestamp {
         return null;
       }
 
-      if (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      if (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
       } else {
         return null;
       }
@@ -93,8 +93,8 @@ public class FileSeriesReaderByTimestamp {
         }
         return null;
       } else {
-        if (chunkReader.hasNextBatch()) {
-          data = chunkReader.nextBatch();
+        if (chunkReader.hasNextSatisfiedPage()) {
+          data = chunkReader.nextPageData();
         } else if (!constructNextSatisfiedChunkReader()) {
           return null;
         }
@@ -115,16 +115,16 @@ public class FileSeriesReaderByTimestamp {
       if (data != null && data.hasCurrent()) {
         return true;
       }
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data != null && data.hasCurrent()) {
           return true;
         }
       }
     }
     while (constructNextSatisfiedChunkReader()) {
-      while (chunkReader.hasNextBatch()) {
-        data = chunkReader.nextBatch();
+      while (chunkReader.hasNextSatisfiedPage()) {
+        data = chunkReader.nextPageData();
         if (data != null && data.hasCurrent()) {
           return true;
         }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
index bcce85c..6a55bd1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
@@ -171,14 +171,11 @@ public class PageReaderTest {
         ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
 
         PageReader pageReader = new PageReader(page, dataType, decoder,
-            new DeltaBinaryDecoder.LongDeltaDecoder());
+            new DeltaBinaryDecoder.LongDeltaDecoder(), null);
 
         int index = 0;
         long startTimestamp = System.currentTimeMillis();
-        BatchData data = null;
-        if (pageReader.hasNextBatch()) {
-          data = pageReader.nextBatch();
-        }
+        BatchData data = pageReader.getAllSatisfiedPageData();
         assert data != null;
 
         while (data.hasCurrent()) {