You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/02/08 03:34:33 UTC

[iotdb] 02/03: modify IPageReader & merge reader

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/limitPushDown
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c3cfd75a8d7853ad6812c41e4c3e50884871e9a3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Feb 8 11:32:15 2023 +0800

    modify IPageReader & merge reader
---
 .../execution/operator/source/SeriesScanUtil.java  | 35 +++++++---
 .../query/reader/chunk/MemAlignedPageReader.java   | 26 ++++++-
 .../iotdb/db/query/reader/chunk/MemPageReader.java | 80 ++++++++++++++++++++--
 .../tsfile/read/reader/page/AlignedPageReader.java | 27 +++++++-
 .../iotdb/tsfile/read/reader/page/PageReader.java  | 71 ++++++++++++++++---
 .../tsfile/read/reader/page/ValuePageReader.java   |  5 +-
 .../read/reader/series/PaginationController.java   | 10 ---
 7 files changed, 209 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 7a35026449..6217b7a6d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -130,10 +129,12 @@ public class SeriesScanUtil {
     this.allSensors = allSensors;
     this.dataType = dataType;
     this.context = context;
+
     this.globalTimeFilter = globalTimeFilter;
-    if(!queryFilter.equals(globalTimeFilter)) {
+    if (!queryFilter.equals(globalTimeFilter)) {
       this.queryFilter = queryFilter;
     }
+    this.paginationController = new PaginationController(0, 0);
 
     if (ascending) {
       this.orderUtils = new AscTimeOrderUtils();
@@ -156,8 +157,6 @@ public class SeriesScanUtil {
         new PriorityQueue<>(
             orderUtils.comparingLong(
                 versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
-
-    paginationController = new PaginationController(0, 0);
   }
 
   public SeriesScanUtil(
@@ -171,7 +170,7 @@ public class SeriesScanUtil {
       int limit,
       int offset) {
     this(seriesPath, allSensors, dataType, context, globalTimeFilter, queryFilter, ascending);
-    paginationController = new PaginationController(limit, offset);
+    this.paginationController = new PaginationController(limit, offset);
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
@@ -781,8 +780,15 @@ public class SeriesScanUtil {
               }
             }
 
-            if (queryFilter == null
-                || queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            if (queryFilter != null
+                && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timeValuePair.getTimestamp());
               switch (dataType) {
                 case BOOLEAN:
@@ -817,6 +823,9 @@ public class SeriesScanUtil {
                   throw new UnSupportedDataTypeException(String.valueOf(dataType));
               }
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           hasCachedNextOverlappedPage = !builder.isEmpty();
@@ -1291,7 +1300,8 @@ public class SeriesScanUtil {
       while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
         TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
         if (tsFileResource != null
-            && tsFileResource.isSatisfied(seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
           break;
         }
         curSeqFileIndex--;
@@ -1304,7 +1314,8 @@ public class SeriesScanUtil {
       while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
         TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
         if (tsFileResource != null
-            && tsFileResource.isSatisfied(seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
           break;
         }
         curUnseqFileIndex++;
@@ -1407,7 +1418,8 @@ public class SeriesScanUtil {
       while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
         TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
         if (tsFileResource != null
-            && tsFileResource.isSatisfied(seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
           break;
         }
         curSeqFileIndex++;
@@ -1420,7 +1432,8 @@ public class SeriesScanUtil {
       while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
         TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
         if (tsFileResource != null
-            && tsFileResource.isSatisfied(seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
           break;
         }
         curUnseqFileIndex++;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 41f4d7c1fd..81c178e8c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
@@ -40,7 +41,10 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
 
   private final TsBlock tsBlock;
   private final AlignedChunkMetadata chunkMetadata;
+
   private Filter valueFilter;
+  private PaginationController paginationController;
+
   private TsBlockBuilder builder;
 
   public MemAlignedPageReader(TsBlock tsBlock, AlignedChunkMetadata chunkMetadata, Filter filter) {
@@ -109,15 +113,28 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
     }
 
     // build time column
+    int readEndIndex = tsBlock.getPositionCount();
     for (int row = 0; row < tsBlock.getPositionCount(); row++) {
-      if (satisfyInfo[row] && hasValue[row]) {
+      if (!satisfyInfo[row] || !hasValue[row]) {
+        continue;
+      }
+      if (paginationController.hasCurOffset()) {
+        paginationController.consumeOffset();
+        satisfyInfo[row] = false;
+        continue;
+      }
+      if (paginationController.hasCurLimit()) {
         builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(row));
         builder.declarePosition();
+        paginationController.consumeLimit();
+      } else {
+        readEndIndex = row + 1;
+        break;
       }
     }
 
     // build value column
-    for (int column = 0; column < tsBlock.getValueColumnCount(); column++) {
+    for (int column = 0; column < readEndIndex; column++) {
       Column valueColumn = tsBlock.getColumn(column);
       ColumnBuilder valueBuilder = builder.getColumnBuilder(column);
       for (int row = 0; row < tsBlock.getPositionCount(); row++) {
@@ -158,6 +175,11 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
     }
   }
 
+  @Override
+  public void setLimitOffset(PaginationController paginationController) {
+    this.paginationController = paginationController;
+  }
+
   @Override
   public boolean isModified() {
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 0baf315eff..bf899e57fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
@@ -41,7 +42,9 @@ public class MemPageReader implements IPageReader {
 
   private final TsBlock tsBlock;
   private final IChunkMetadata chunkMetadata;
+
   private Filter valueFilter;
+  private PaginationController paginationController;
 
   public MemPageReader(TsBlock tsBlock, IChunkMetadata chunkMetadata, Filter filter) {
     this.tsBlock = tsBlock;
@@ -99,10 +102,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           boolean value = tsBlock.getColumn(0).getBoolean(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeBoolean(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -110,10 +123,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           int value = tsBlock.getColumn(0).getInt(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeInt(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -121,10 +144,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           long value = tsBlock.getColumn(0).getLong(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeLong(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -132,10 +165,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           float value = tsBlock.getColumn(0).getFloat(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeFloat(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -143,10 +186,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           double value = tsBlock.getColumn(0).getDouble(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeDouble(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -154,10 +207,20 @@ public class MemPageReader implements IPageReader {
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
           long time = tsBlock.getTimeColumn().getLong(i);
           Binary value = tsBlock.getColumn(0).getBinary(i);
-          if (valueFilter == null || valueFilter.satisfy(time, value)) {
+          if (valueFilter != null && !valueFilter.satisfy(time, value)) {
+            continue;
+          }
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+            continue;
+          }
+          if (paginationController.hasCurLimit()) {
             timeBuilder.writeLong(time);
             valueBuilder.writeBinary(value);
             builder.declarePosition();
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
         }
         break;
@@ -181,6 +244,11 @@ public class MemPageReader implements IPageReader {
     }
   }
 
+  @Override
+  public void setLimitOffset(PaginationController paginationController) {
+    this.paginationController = paginationController;
+  }
+
   @Override
   public boolean isModified() {
     return false;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..bf0900b34b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.slf4j.Logger;
@@ -49,7 +50,10 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   private final TimePageReader timePageReader;
   private final List<ValuePageReader> valuePageReaderList;
   private final int valueCount;
+
   private Filter filter;
+  private PaginationController paginationController;
+
   private boolean isModified;
   private TsBlockBuilder builder;
 
@@ -168,10 +172,22 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
     }
 
     // construct time column
+    int readEndIndex = timeBatch.length;
     for (int i = 0; i < timeBatch.length; i++) {
-      if (keepCurrentRow[i]) {
+      if (!keepCurrentRow[i]) {
+        continue;
+      }
+      if (paginationController.hasCurOffset()) {
+        paginationController.consumeOffset();
+        keepCurrentRow[i] = false;
+        continue;
+      }
+      if (paginationController.hasCurLimit()) {
         builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
         builder.declarePosition();
+      } else {
+        readEndIndex = i + 1;
+        break;
       }
     }
 
@@ -180,9 +196,9 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
       ValuePageReader pageReader = valuePageReaderList.get(i);
       if (pageReader != null) {
         pageReader.writeColumnBuilderWithNextBatch(
-            timeBatch, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
+            timeBatch, readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
       } else {
-        for (int j = 0; j < timeBatch.length; j++) {
+        for (int j = 0; j < readEndIndex; j++) {
           if (keepCurrentRow[j]) {
             builder.getColumnBuilder(i).appendNull();
           }
@@ -227,6 +243,11 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
     }
   }
 
+  @Override
+  public void setLimitOffset(PaginationController paginationController) {
+    this.paginationController = paginationController;
+  }
+
   @Override
   public boolean isModified() {
     return isModified;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 5a747a6053..98d7a2ade5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -174,7 +174,6 @@ public class PageReader implements IPageReader {
             if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, aBoolean))) {
               continue;
             }
-
             if (paginationController.hasCurOffset()) {
               paginationController.consumeOffset();
               continue;
@@ -193,10 +192,20 @@ public class PageReader implements IPageReader {
           while (timeDecoder.hasNext(timeBuffer)) {
             long timestamp = timeDecoder.readLong(timeBuffer);
             int anInt = valueDecoder.readInt(valueBuffer);
-            if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
+            if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, anInt))) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timestamp);
               valueBuilder.writeInt(anInt);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           break;
@@ -204,10 +213,20 @@ public class PageReader implements IPageReader {
           while (timeDecoder.hasNext(timeBuffer)) {
             long timestamp = timeDecoder.readLong(timeBuffer);
             long aLong = valueDecoder.readLong(valueBuffer);
-            if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
+            if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, aLong))) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timestamp);
               valueBuilder.writeLong(aLong);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           break;
@@ -215,10 +234,20 @@ public class PageReader implements IPageReader {
           while (timeDecoder.hasNext(timeBuffer)) {
             long timestamp = timeDecoder.readLong(timeBuffer);
             float aFloat = valueDecoder.readFloat(valueBuffer);
-            if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
+            if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, aFloat))) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timestamp);
               valueBuilder.writeFloat(aFloat);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           break;
@@ -226,10 +255,20 @@ public class PageReader implements IPageReader {
           while (timeDecoder.hasNext(timeBuffer)) {
             long timestamp = timeDecoder.readLong(timeBuffer);
             double aDouble = valueDecoder.readDouble(valueBuffer);
-            if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
+            if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, aDouble))) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timestamp);
               valueBuilder.writeDouble(aDouble);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           break;
@@ -237,10 +276,20 @@ public class PageReader implements IPageReader {
           while (timeDecoder.hasNext(timeBuffer)) {
             long timestamp = timeDecoder.readLong(timeBuffer);
             Binary aBinary = valueDecoder.readBinary(valueBuffer);
-            if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
+            if (isDeleted(timestamp) || (filter != null && !filter.satisfy(timestamp, aBinary))) {
+              continue;
+            }
+            if (paginationController.hasCurOffset()) {
+              paginationController.consumeOffset();
+              continue;
+            }
+            if (paginationController.hasCurLimit()) {
               timeBuilder.writeLong(timestamp);
               valueBuilder.writeBinary(aBinary);
               builder.declarePosition();
+              paginationController.consumeLimit();
+            } else {
+              break;
             }
           }
           break;
@@ -265,6 +314,11 @@ public class PageReader implements IPageReader {
     }
   }
 
+  @Override
+  public void setLimitOffset(PaginationController paginationController) {
+    this.paginationController = paginationController;
+  }
+
   public void setDeleteIntervalList(List<TimeRange> list) {
     this.deleteIntervalList = list;
   }
@@ -281,11 +335,6 @@ public class PageReader implements IPageReader {
   @Override
   public void initTsBlockBuilder(List<TSDataType> dataTypes) {}
 
-  @Override
-  public void setLimitOffset(PaginationController paginationController) {
-    this.paginationController = paginationController;
-  }
-
   protected boolean isDeleted(long timestamp) {
     while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
       if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index 45729ec05e..5e75ddaf42 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -243,18 +243,19 @@ public class ValuePageReader {
 
   public void writeColumnBuilderWithNextBatch(
       long[] timeBatch,
+      int readEndIndex,
       ColumnBuilder columnBuilder,
       boolean[] keepCurrentRow,
       boolean[] isDeleted) {
     if (valueBuffer == null) {
-      for (int i = 0, n = timeBatch.length; i < n; i++) {
+      for (int i = 0; i < readEndIndex; i++) {
         if (keepCurrentRow[i]) {
           columnBuilder.appendNull();
         }
       }
       return;
     }
-    for (int i = 0, n = timeBatch.length; i < n; i++) {
+    for (int i = 0; i < readEndIndex; i++) {
       if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
         if (keepCurrentRow[i]) {
           columnBuilder.appendNull();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
index 1cda7ac555..d9b15bc6f1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/PaginationController.java
@@ -47,10 +47,6 @@ public class PaginationController {
     return !hasLimit || curLimit > 0;
   }
 
-  public boolean hasCurLimit(int rowCount) {
-    return !hasLimit || curLimit > rowCount;
-  }
-
   public void consumeOffset(int rowCount) {
     curOffset -= rowCount;
   }
@@ -59,12 +55,6 @@ public class PaginationController {
     curOffset--;
   }
 
-  public void consumeLimit(int rowCount) {
-    if (hasLimit) {
-      curLimit -= rowCount;
-    }
-  }
-
   public void consumeLimit() {
     if (hasLimit) {
       curLimit--;