You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/28 02:32:18 UTC

[iotdb] branch lmh/scanOpBatchProcess created (now 16dfbebf40)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/scanOpBatchProcess
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 16dfbebf40 tmp save before run

This branch includes the following new commits:

     new 16dfbebf40 tmp save before run

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: tmp save before run

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16dfbebf40a23f22c8861010a00d9c48765f4f49
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 28 10:31:40 2022 +0800

    tmp save before run
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   8 +-
 ...erator.java => AbstractSeriesScanOperator.java} | 122 ++++------
 .../operator/source/AlignedSeriesScanOperator.java | 136 +----------
 .../operator/source/AlignedSeriesScanUtil.java     |   2 +
 .../operator/source/SeriesScanOperator.java        | 133 +---------
 .../execution/operator/source/SeriesScanUtil.java  | 269 ++++++++++++++++++++-
 .../query/reader/chunk/MemAlignedPageReader.java   |   7 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |   8 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |   2 +-
 .../iotdb/tsfile/read/reader/IPageReader.java      |  25 ++
 .../tsfile/read/reader/page/AlignedPageReader.java |   7 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |   7 +-
 12 files changed, 388 insertions(+), 338 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 337f26d487..2a67c0dd7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,13 +397,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = true;
+  private boolean enableUnseqSpaceCompaction = false;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private boolean enableCrossSpaceCompaction = false;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -558,7 +558,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 600000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index 05685f758d..ae3191626a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -16,52 +16,44 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.io.IOException;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-public class SeriesScanOperator implements DataSourceOperator {
+public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
 
   private final OperatorContext operatorContext;
   private final SeriesScanUtil seriesScanUtil;
   private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
+
   private boolean finished = false;
 
+  private final TsBlockBuilder resultBuilder;
+
   private final long maxReturnSize;
 
-  public SeriesScanOperator(
+  public AbstractSeriesScanOperator(
       PlanNodeId sourceId,
-      PartialPath seriesPath,
-      Set<String> allSensors,
-      TSDataType dataType,
-      OperatorContext context,
-      Filter timeFilter,
-      Filter valueFilter,
-      boolean ascending) {
+      SeriesScanUtil seriesScanUtil,
+      int subSensorSize,
+      OperatorContext context) {
     this.sourceId = sourceId;
     this.operatorContext = context;
-    this.seriesScanUtil =
-        new SeriesScanUtil(
-            seriesPath,
-            allSensors,
-            dataType,
-            context.getInstanceContext(),
-            timeFilter,
-            valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.seriesScanUtil = seriesScanUtil;
+    this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
+
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -71,49 +63,40 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
+    TsBlock block = resultBuilder.build();
+    resultBuilder.reset();
+    return block;
   }
 
   @Override
   public boolean hasNext() {
-
     try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
+      // start stopwatch
+      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      long start = System.nanoTime();
+
+      // here use do-while to promise doing this at least once
+      do {
+        // consume page data firstly
+        if (readPageData()) {
+          continue;
+        }
 
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
+        // consume chunk data secondly
         if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
+          continue;
         }
-      }
-      return hasCachedTsBlock;
+
+        // consume next file finally
+        if (readFileData()) {
+          continue;
+        }
+        break;
+
+      } while (System.nanoTime() - start < maxRuntime && !resultBuilder.isFull());
+
+      finished = resultBuilder.isEmpty();
+      return !finished;
     } catch (IOException e) {
       throw new RuntimeException("Error happened while scanning the file", e);
     }
@@ -121,7 +104,7 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = !hasNext());
+    return finished;
   }
 
   @Override
@@ -139,27 +122,26 @@ public class SeriesScanOperator implements DataSourceOperator {
     return 0L;
   }
 
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
+  private boolean readFileData() throws IOException {
+    while (seriesScanUtil.hasNextFile()) {
+      if (readChunkData()) {
         return true;
       }
     }
     return false;
   }
 
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
         return true;
       }
     }
     return false;
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
+  private boolean readPageData() throws IOException {
+    return seriesScanUtil.tryToFetchDataFromPage();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 8406437802..1adcce23b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,26 +19,13 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.HashSet;
 
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final AlignedSeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
 
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
@@ -47,127 +34,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new AlignedSeriesScanUtil(
             seriesPath,
             new HashSet<>(seriesPath.getMeasurementList()),
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    // time + all value columns
-    this.maxReturnSize =
-        (1L + seriesPath.getMeasurementList().size())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
-  }
-
-  @Override
-  public boolean hasNext() {
-
-    try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
-        if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
-        }
-      }
-      return hasCachedTsBlock;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        seriesPath.getMeasurementList().size(),
+        context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index f41456b063..6cc0689003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -57,6 +58,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
     dataTypes =
         ((AlignedPath) seriesPath)
             .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+    cachedTsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 05685f758d..491607692b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,27 +19,14 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.Set;
 
-public class SeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final SeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
 
   public SeriesScanOperator(
       PlanNodeId sourceId,
@@ -50,9 +37,8 @@ public class SeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new SeriesScanUtil(
             seriesPath,
             allSensors,
@@ -60,115 +46,8 @@ public class SeriesScanOperator implements DataSourceOperator {
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
-  }
-
-  @Override
-  public boolean hasNext() {
-
-    try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
-        if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
-        }
-      }
-      return hasCachedTsBlock;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        1,
+        context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..b667d08858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ public class SeriesScanUtil {
   protected final TSDataType dataType;
 
   // inner class of SeriesReader for order purpose
-  private TimeOrderUtils orderUtils;
+  private final TimeOrderUtils orderUtils;
 
   /*
    * There is at most one is not null between timeFilter and valueFilter
@@ -121,6 +121,8 @@ public class SeriesScanUtil {
   protected boolean hasCachedNextOverlappedPage;
   protected TsBlock cachedTsBlock;
 
+  protected TsBlockBuilder cachedTsBlockBuilder;
+
   public SeriesScanUtil(
       PartialPath seriesPath,
       Set<String> allSensors,
@@ -156,6 +158,10 @@ public class SeriesScanUtil {
         new PriorityQueue<>(
             orderUtils.comparingLong(
                 versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+
+    if (dataType != TSDataType.VECTOR) {
+      this.cachedTsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    }
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
@@ -450,6 +456,259 @@ public class SeriesScanUtil {
     return firstPageReader != null;
   }
 
+  public boolean tryToFetchDataFromPage() throws IOException {
+
+    /*
+     * has overlapped data
+     */
+    if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+      if (tryToBuildFromMergeReader()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+
+    /*
+     * construct first page reader
+     */
+    if (firstChunkMetadata != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToPageReaders(
+          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first chunk metadata is already unpacked, consume cached pages
+       */
+      initFirstPageReader();
+    }
+
+    if (tryToBuildFromOverlappedPage()) {
+      return true;
+    }
+
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+    // readers
+    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+      initFirstPageReader();
+
+      if (tryToBuildFromOverlappedPage()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+    return false;
+  }
+
+  private boolean tryToBuildFromOverlappedPage() throws IOException {
+    if (firstPageOverlapped()) {
+      // next page is overlapped, read overlapped data and cache it
+      return tryToBuildFromMergeReader();
+    }
+    return false;
+  }
+
+  public void buildFromPageReader() throws IOException {
+    /*
+     * next page is not overlapped, push down value filter if it exists
+     */
+    if (valueFilter != null) {
+      firstPageReader.setFilter(valueFilter);
+    }
+    firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder);
+    firstPageReader = null;
+  }
+
+  private boolean tryToBuildFromMergeReader() throws IOException {
+    int rawSize = cachedTsBlockBuilder.getPositionCount();
+
+    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+    while (true) {
+
+      // may has overlapped data
+      if (mergeReader.hasNextTimeValuePair()) {
+
+        // TODO we still need to consider data type, ascending and descending here
+        TimeColumnBuilder timeBuilder = cachedTsBlockBuilder.getTimeColumnBuilder();
+        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+        while (mergeReader.hasNextTimeValuePair()) {
+
+          /*
+           * get current first point in mergeReader, this maybe overlapped later
+           */
+          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+            /*
+             * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+             * 1. has cached batch data, we don't need to read more data, just use the cached data later
+             * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could just use the first page reader later
+             * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could use the first sequence page reader later
+             */
+            if (cachedTsBlockBuilder.getPositionCount() > rawSize
+                || firstPageReader != null
+                || !seqPageReaders.isEmpty()) {
+              break;
+            }
+            // so, we don't have other data except mergeReader
+            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          }
+
+          // unpack all overlapped data for the first timeValuePair
+          unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+              timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+          // update if there are unpacked unSeqPageReaders
+          timeValuePair = mergeReader.currentTimeValuePair();
+
+          // from now, the unsequence reader is all unpacked, so we don't need to consider it
+          // we has first page reader now
+          if (firstPageReader != null) {
+            // if current timeValuePair excesses the first page reader's end time, we just use the
+            // cached data
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < firstPageReader.getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+              // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+              // and update endTime to the max end time
+              mergeReader.addReader(
+                  getPointReader(
+                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  firstPageReader.version,
+                  orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime =
+                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
+              firstPageReader = null;
+            }
+          }
+
+          // the seq page readers is not empty, just like first page reader
+          if (!seqPageReaders.isEmpty()) {
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        > seqPageReaders.get(0).getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < seqPageReaders.get(0).getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+              VersionPageReader pageReader = seqPageReaders.remove(0);
+              mergeReader.addReader(
+                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  pageReader.version,
+                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+            }
+          }
+
+          /*
+           * get the latest first point in mergeReader
+           */
+          timeValuePair = mergeReader.nextTimeValuePair();
+
+          Object valueForFilter = timeValuePair.getValue().getValue();
+
+          // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+          // only accept AlignedPath with only one sub sensor
+          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+            for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+              if (tsPrimitiveType != null) {
+                valueForFilter = tsPrimitiveType.getValue();
+                break;
+              }
+            }
+          }
+
+          if (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            switch (dataType) {
+              case BOOLEAN:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBoolean(timeValuePair.getValue().getBoolean());
+                break;
+              case INT32:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeInt(timeValuePair.getValue().getInt());
+                break;
+              case INT64:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeLong(timeValuePair.getValue().getLong());
+                break;
+              case FLOAT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeFloat(timeValuePair.getValue().getFloat());
+                break;
+              case DOUBLE:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeDouble(timeValuePair.getValue().getDouble());
+                break;
+              case TEXT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBinary(timeValuePair.getValue().getBinary());
+                break;
+              case VECTOR:
+                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                for (int i = 0; i < values.length; i++) {
+                  if (values[i] == null) {
+                    cachedTsBlockBuilder.getColumnBuilder(i).appendNull();
+                  } else {
+                    cachedTsBlockBuilder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                  }
+                }
+                break;
+              default:
+                throw new UnSupportedDataTypeException(String.valueOf(dataType));
+            }
+            cachedTsBlockBuilder.declarePosition();
+          }
+        }
+
+        /*
+         * if current overlapped page has valid data, return, otherwise read next overlapped page
+         */
+        if (cachedTsBlockBuilder.getPositionCount() > rawSize) {
+          return true;
+        } else if (mergeReader.hasNextTimeValuePair()) {
+          // condition: seqPage.endTime < mergeReader.currentTime
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+  }
+
   private boolean isExistOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       /*
@@ -1088,6 +1347,10 @@ public class SeriesScanUtil {
     return orderUtils;
   }
 
+  public TsBlockBuilder getCachedTsBlockBuilder() {
+    return cachedTsBlockBuilder;
+  }
+
   protected class VersionPageReader {
 
     protected PriorityMergeReader.MergeReaderPriority version;
@@ -1131,6 +1394,10 @@ public class SeriesScanUtil {
       return tsBlock;
     }
 
+    void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException {
+      data.getAllSatisfiedData(ascending, builder);
+    }
+
     void setFilter(Filter filter) {
       data.setFilter(filter);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 41f4d7c1fd..ae13babf9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -88,7 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
 
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
     boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
 
     for (int row = 0; row < tsBlock.getPositionCount(); row++) {
@@ -130,8 +135,6 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-
-    return builder.build();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 0baf315eff..85f08de1c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -92,6 +92,13 @@ public class MemPageReader implements IPageReader {
   public TsBlock getAllSatisfiedData() {
     TSDataType dataType = chunkMetadata.getDataType();
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
+    TSDataType dataType = chunkMetadata.getDataType();
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     switch (dataType) {
@@ -164,7 +171,6 @@ public class MemPageReader implements IPageReader {
       default:
         throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
-    return builder.build();
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index d4152d4ddb..c309835a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -259,7 +259,7 @@ public class TsBlockBuilder {
   }
 
   public boolean isFull() {
-    return declaredPositions == MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
+    return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
   }
 
   public boolean isEmpty() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index a68f4590b1..a967cae19c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -22,6 +22,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -37,6 +40,28 @@ public interface IPageReader {
 
   TsBlock getAllSatisfiedData() throws IOException;
 
+  void writeDataToBuilder(TsBlockBuilder builder) throws IOException;
+
+  default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException {
+    if (ascending) {
+      writeDataToBuilder(builder);
+    } else {
+      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders();
+      int columnNum = valueColumnBuilders.length;
+
+      TsBlock tsBlock = getAllSatisfiedData();
+      tsBlock.reverse();
+      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+        timeColumnBuilder.write(tsBlock.getTimeColumn(), i);
+        for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+          valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i);
+        }
+        builder.declarePosition();
+      }
+    }
+  }
+
   Statistics getStatistics();
 
   void setFilter(Filter filter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..faae955198 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -116,6 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     long[] timeBatch = timePageReader.getNextTimeBatch();
 
     // if all the sub sensors' value are null in current row, just discard it
@@ -189,7 +195,6 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-    return builder.build();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index e1fce8ff65..190bb11b5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -161,6 +161,12 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {
@@ -235,7 +241,6 @@ public class PageReader implements IPageReader {
           throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return builder.build();
   }
 
   @Override