You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/28 02:32:19 UTC

[iotdb] 01/01: tmp save before run

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16dfbebf40a23f22c8861010a00d9c48765f4f49
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 28 10:31:40 2022 +0800

    tmp save before run
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   8 +-
 ...erator.java => AbstractSeriesScanOperator.java} | 122 ++++------
 .../operator/source/AlignedSeriesScanOperator.java | 136 +----------
 .../operator/source/AlignedSeriesScanUtil.java     |   2 +
 .../operator/source/SeriesScanOperator.java        | 133 +---------
 .../execution/operator/source/SeriesScanUtil.java  | 269 ++++++++++++++++++++-
 .../query/reader/chunk/MemAlignedPageReader.java   |   7 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |   8 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |   2 +-
 .../iotdb/tsfile/read/reader/IPageReader.java      |  25 ++
 .../tsfile/read/reader/page/AlignedPageReader.java |   7 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |   7 +-
 12 files changed, 388 insertions(+), 338 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 337f26d487..2a67c0dd7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,13 +397,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = true;
+  private boolean enableUnseqSpaceCompaction = false;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private boolean enableCrossSpaceCompaction = false;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -558,7 +558,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 600000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index 05685f758d..ae3191626a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -16,52 +16,44 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.io.IOException;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-public class SeriesScanOperator implements DataSourceOperator {
+public abstract class AbstractSeriesScanOperator implements DataSourceOperator {
 
   private final OperatorContext operatorContext;
   private final SeriesScanUtil seriesScanUtil;
   private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
+
   private boolean finished = false;
 
+  private final TsBlockBuilder resultBuilder;
+
   private final long maxReturnSize;
 
-  public SeriesScanOperator(
+  public AbstractSeriesScanOperator(
       PlanNodeId sourceId,
-      PartialPath seriesPath,
-      Set<String> allSensors,
-      TSDataType dataType,
-      OperatorContext context,
-      Filter timeFilter,
-      Filter valueFilter,
-      boolean ascending) {
+      SeriesScanUtil seriesScanUtil,
+      int subSensorSize,
+      OperatorContext context) {
     this.sourceId = sourceId;
     this.operatorContext = context;
-    this.seriesScanUtil =
-        new SeriesScanUtil(
-            seriesPath,
-            allSensors,
-            dataType,
-            context.getInstanceContext(),
-            timeFilter,
-            valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.seriesScanUtil = seriesScanUtil;
+    this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder();
+
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -71,49 +63,40 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
+    TsBlock block = resultBuilder.build();
+    resultBuilder.reset();
+    return block;
   }
 
   @Override
   public boolean hasNext() {
-
     try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
+      // start stopwatch
+      long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      long start = System.nanoTime();
+
+      // here use do-while to promise doing this at least once
+      do {
+        // consume page data firstly
+        if (readPageData()) {
+          continue;
+        }
 
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
+        // consume chunk data secondly
         if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
+          continue;
         }
-      }
-      return hasCachedTsBlock;
+
+        // consume next file finally
+        if (readFileData()) {
+          continue;
+        }
+        break;
+
+      } while (System.nanoTime() - start < maxRuntime && !resultBuilder.isFull());
+
+      finished = resultBuilder.isEmpty();
+      return !finished;
     } catch (IOException e) {
       throw new RuntimeException("Error happened while scanning the file", e);
     }
@@ -121,7 +104,7 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = !hasNext());
+    return finished;
   }
 
   @Override
@@ -139,27 +122,26 @@ public class SeriesScanOperator implements DataSourceOperator {
     return 0L;
   }
 
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
+  private boolean readFileData() throws IOException {
+    while (seriesScanUtil.hasNextFile()) {
+      if (readChunkData()) {
         return true;
       }
     }
     return false;
   }
 
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
         return true;
       }
     }
     return false;
   }
 
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
+  private boolean readPageData() throws IOException {
+    return seriesScanUtil.tryToFetchDataFromPage();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 8406437802..1adcce23b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -19,26 +19,13 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.HashSet;
 
-public class AlignedSeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final AlignedSeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
 
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
@@ -47,127 +34,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new AlignedSeriesScanUtil(
             seriesPath,
             new HashSet<>(seriesPath.getMeasurementList()),
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    // time + all value columns
-    this.maxReturnSize =
-        (1L + seriesPath.getMeasurementList().size())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
-  }
-
-  @Override
-  public boolean hasNext() {
-
-    try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
-        if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
-        }
-      }
-      return hasCachedTsBlock;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        seriesPath.getMeasurementList().size(),
+        context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index f41456b063..6cc0689003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -57,6 +58,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
     dataTypes =
         ((AlignedPath) seriesPath)
             .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+    cachedTsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 05685f758d..491607692b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -19,27 +19,14 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.Set;
 
-public class SeriesScanOperator implements DataSourceOperator {
-
-  private final OperatorContext operatorContext;
-  private final SeriesScanUtil seriesScanUtil;
-  private final PlanNodeId sourceId;
-  private TsBlock tsBlock;
-  private boolean hasCachedTsBlock = false;
-  private boolean finished = false;
-
-  private final long maxReturnSize;
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
 
   public SeriesScanOperator(
       PlanNodeId sourceId,
@@ -50,9 +37,8 @@ public class SeriesScanOperator implements DataSourceOperator {
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    this.sourceId = sourceId;
-    this.operatorContext = context;
-    this.seriesScanUtil =
+    super(
+        sourceId,
         new SeriesScanUtil(
             seriesPath,
             allSensors,
@@ -60,115 +46,8 @@ public class SeriesScanOperator implements DataSourceOperator {
             context.getInstanceContext(),
             timeFilter,
             valueFilter,
-            ascending);
-    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public TsBlock next() {
-    if (hasCachedTsBlock || hasNext()) {
-      hasCachedTsBlock = false;
-      TsBlock res = tsBlock;
-      tsBlock = null;
-      return res;
-    }
-    throw new IllegalStateException("no next batch");
-  }
-
-  @Override
-  public boolean hasNext() {
-
-    try {
-      if (hasCachedTsBlock) {
-        return true;
-      }
-
-      /*
-       * consume page data firstly
-       */
-      if (readPageData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume chunk data secondly
-       */
-      if (readChunkData()) {
-        hasCachedTsBlock = true;
-        return true;
-      }
-
-      /*
-       * consume next file finally
-       */
-      while (seriesScanUtil.hasNextFile()) {
-        if (readChunkData()) {
-          hasCachedTsBlock = true;
-          return true;
-        }
-      }
-      return hasCachedTsBlock;
-    } catch (IOException e) {
-      throw new RuntimeException("Error happened while scanning the file", e);
-    }
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished || (finished = !hasNext());
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return 0L;
-  }
-
-  private boolean readChunkData() throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      if (readPageData()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean readPageData() throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      tsBlock = seriesScanUtil.nextPage();
-      if (!isEmpty(tsBlock)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || tsBlock.isEmpty();
-  }
-
-  @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
-  }
-
-  @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+            ascending),
+        1,
+        context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..b667d08858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ public class SeriesScanUtil {
   protected final TSDataType dataType;
 
   // inner class of SeriesReader for order purpose
-  private TimeOrderUtils orderUtils;
+  private final TimeOrderUtils orderUtils;
 
   /*
    * There is at most one is not null between timeFilter and valueFilter
@@ -121,6 +121,8 @@ public class SeriesScanUtil {
   protected boolean hasCachedNextOverlappedPage;
   protected TsBlock cachedTsBlock;
 
+  protected TsBlockBuilder cachedTsBlockBuilder;
+
   public SeriesScanUtil(
       PartialPath seriesPath,
       Set<String> allSensors,
@@ -156,6 +158,10 @@ public class SeriesScanUtil {
         new PriorityQueue<>(
             orderUtils.comparingLong(
                 versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+
+    if (dataType != TSDataType.VECTOR) {
+      this.cachedTsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    }
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
@@ -450,6 +456,259 @@ public class SeriesScanUtil {
     return firstPageReader != null;
   }
 
+  public boolean tryToFetchDataFromPage() throws IOException {
+
+    /*
+     * has overlapped data
+     */
+    if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+      if (tryToBuildFromMergeReader()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+
+    /*
+     * construct first page reader
+     */
+    if (firstChunkMetadata != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToPageReaders(
+          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first chunk metadata is already unpacked, consume cached pages
+       */
+      initFirstPageReader();
+    }
+
+    if (tryToBuildFromOverlappedPage()) {
+      return true;
+    }
+
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+    // readers
+    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+      initFirstPageReader();
+
+      if (tryToBuildFromOverlappedPage()) {
+        return true;
+      }
+    }
+
+    if (firstPageReader != null) {
+      buildFromPageReader();
+      return true;
+    }
+    return false;
+  }
+
+  private boolean tryToBuildFromOverlappedPage() throws IOException {
+    if (firstPageOverlapped()) {
+      // next page is overlapped, read overlapped data and cache it
+      return tryToBuildFromMergeReader();
+    }
+    return false;
+  }
+
+  public void buildFromPageReader() throws IOException {
+    /*
+     * next page is not overlapped, push down value filter if it exists
+     */
+    if (valueFilter != null) {
+      firstPageReader.setFilter(valueFilter);
+    }
+    firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder);
+    firstPageReader = null;
+  }
+
+  private boolean tryToBuildFromMergeReader() throws IOException {
+    int rawSize = cachedTsBlockBuilder.getPositionCount();
+
+    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+    while (true) {
+
+      // may has overlapped data
+      if (mergeReader.hasNextTimeValuePair()) {
+
+        // TODO we still need to consider data type, ascending and descending here
+        TimeColumnBuilder timeBuilder = cachedTsBlockBuilder.getTimeColumnBuilder();
+        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+        while (mergeReader.hasNextTimeValuePair()) {
+
+          /*
+           * get current first point in mergeReader, this maybe overlapped later
+           */
+          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+            /*
+             * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+             * 1. has cached batch data, we don't need to read more data, just use the cached data later
+             * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could just use the first page reader later
+             * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could use the first sequence page reader later
+             */
+            if (cachedTsBlockBuilder.getPositionCount() > rawSize
+                || firstPageReader != null
+                || !seqPageReaders.isEmpty()) {
+              break;
+            }
+            // so, we don't have other data except mergeReader
+            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          }
+
+          // unpack all overlapped data for the first timeValuePair
+          unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+              timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+          // update if there are unpacked unSeqPageReaders
+          timeValuePair = mergeReader.currentTimeValuePair();
+
+          // from now, the unsequence reader is all unpacked, so we don't need to consider it
+          // we has first page reader now
+          if (firstPageReader != null) {
+            // if current timeValuePair excesses the first page reader's end time, we just use the
+            // cached data
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < firstPageReader.getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+              // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+              // and update endTime to the max end time
+              mergeReader.addReader(
+                  getPointReader(
+                      firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  firstPageReader.version,
+                  orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime =
+                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
+              firstPageReader = null;
+            }
+          }
+
+          // the seq page readers is not empty, just like first page reader
+          if (!seqPageReaders.isEmpty()) {
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        > seqPageReaders.get(0).getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < seqPageReaders.get(0).getStatistics().getStartTime())) {
+              return cachedTsBlockBuilder.getPositionCount() > rawSize;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+              VersionPageReader pageReader = seqPageReaders.remove(0);
+              mergeReader.addReader(
+                  getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
+                  pageReader.version,
+                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+            }
+          }
+
+          /*
+           * get the latest first point in mergeReader
+           */
+          timeValuePair = mergeReader.nextTimeValuePair();
+
+          Object valueForFilter = timeValuePair.getValue().getValue();
+
+          // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+          // only accept AlignedPath with only one sub sensor
+          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+            for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+              if (tsPrimitiveType != null) {
+                valueForFilter = tsPrimitiveType.getValue();
+                break;
+              }
+            }
+          }
+
+          if (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            switch (dataType) {
+              case BOOLEAN:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBoolean(timeValuePair.getValue().getBoolean());
+                break;
+              case INT32:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeInt(timeValuePair.getValue().getInt());
+                break;
+              case INT64:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeLong(timeValuePair.getValue().getLong());
+                break;
+              case FLOAT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeFloat(timeValuePair.getValue().getFloat());
+                break;
+              case DOUBLE:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeDouble(timeValuePair.getValue().getDouble());
+                break;
+              case TEXT:
+                cachedTsBlockBuilder
+                    .getColumnBuilder(0)
+                    .writeBinary(timeValuePair.getValue().getBinary());
+                break;
+              case VECTOR:
+                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                for (int i = 0; i < values.length; i++) {
+                  if (values[i] == null) {
+                    cachedTsBlockBuilder.getColumnBuilder(i).appendNull();
+                  } else {
+                    cachedTsBlockBuilder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                  }
+                }
+                break;
+              default:
+                throw new UnSupportedDataTypeException(String.valueOf(dataType));
+            }
+            cachedTsBlockBuilder.declarePosition();
+          }
+        }
+
+        /*
+         * if current overlapped page has valid data, return, otherwise read next overlapped page
+         */
+        if (cachedTsBlockBuilder.getPositionCount() > rawSize) {
+          return true;
+        } else if (mergeReader.hasNextTimeValuePair()) {
+          // condition: seqPage.endTime < mergeReader.currentTime
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+  }
+
   private boolean isExistOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       /*
@@ -1088,6 +1347,10 @@ public class SeriesScanUtil {
     return orderUtils;
   }
 
+  public TsBlockBuilder getCachedTsBlockBuilder() {
+    return cachedTsBlockBuilder;
+  }
+
   protected class VersionPageReader {
 
     protected PriorityMergeReader.MergeReaderPriority version;
@@ -1131,6 +1394,10 @@ public class SeriesScanUtil {
       return tsBlock;
     }
 
+    void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException {
+      data.getAllSatisfiedData(ascending, builder);
+    }
+
     void setFilter(Filter filter) {
       data.setFilter(filter);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 41f4d7c1fd..ae13babf9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -88,7 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
 
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
     boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
 
     for (int row = 0; row < tsBlock.getPositionCount(); row++) {
@@ -130,8 +135,6 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-
-    return builder.build();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 0baf315eff..85f08de1c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -92,6 +92,13 @@ public class MemPageReader implements IPageReader {
   public TsBlock getAllSatisfiedData() {
     TSDataType dataType = chunkMetadata.getDataType();
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) {
+    TSDataType dataType = chunkMetadata.getDataType();
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     switch (dataType) {
@@ -164,7 +171,6 @@ public class MemPageReader implements IPageReader {
       default:
         throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
-    return builder.build();
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index d4152d4ddb..c309835a09 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -259,7 +259,7 @@ public class TsBlockBuilder {
   }
 
   public boolean isFull() {
-    return declaredPositions == MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
+    return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull();
   }
 
   public boolean isEmpty() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index a68f4590b1..a967cae19c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -22,6 +22,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -37,6 +40,28 @@ public interface IPageReader {
 
   TsBlock getAllSatisfiedData() throws IOException;
 
+  void writeDataToBuilder(TsBlockBuilder builder) throws IOException;
+
+  default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException {
+    if (ascending) {
+      writeDataToBuilder(builder);
+    } else {
+      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders();
+      int columnNum = valueColumnBuilders.length;
+
+      TsBlock tsBlock = getAllSatisfiedData();
+      tsBlock.reverse();
+      for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+        timeColumnBuilder.write(tsBlock.getTimeColumn(), i);
+        for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) {
+          valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i);
+        }
+        builder.declarePosition();
+      }
+    }
+  }
+
   Statistics getStatistics();
 
   void setFilter(Filter filter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..faae955198 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -116,6 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     builder.reset();
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     long[] timeBatch = timePageReader.getNextTimeBatch();
 
     // if all the sub sensors' value are null in current row, just discard it
@@ -189,7 +195,6 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
         }
       }
     }
-    return builder.build();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index e1fce8ff65..190bb11b5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -161,6 +161,12 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData() throws IOException {
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType));
+    writeDataToBuilder(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void writeDataToBuilder(TsBlockBuilder builder) throws IOException {
     TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
     ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {
@@ -235,7 +241,6 @@ public class PageReader implements IPageReader {
           throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return builder.build();
   }
 
   @Override