You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by sp...@apache.org on 2023/03/04 13:44:16 UTC

[iotdb] branch research/column-compaction updated: add multi-column compaction (#9214)

This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch research/column-compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/column-compaction by this push:
     new 45d49dce74 add multi-column compaction (#9214)
45d49dce74 is described below

commit 45d49dce745fbedf16991f8e2295ae9ab544e232
Author: Chenguang Fang <th...@163.com>
AuthorDate: Sat Mar 4 21:44:10 2023 +0800

    add multi-column compaction (#9214)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   38 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   11 +
 .../db/engine/compaction/CompactionScheduler.java  |    2 +
 .../sizetiered/SizeTieredCompactionSelector.java   |  470 ++++++-
 .../inner/sizetiered/SizeTieredCompactionTask.java |    8 -
 .../apache/iotdb/db/metadata/path/AlignedPath.java |   24 +
 .../apache/iotdb/db/metadata/path/PartialPath.java |   13 +
 .../db/query/control/QueryResourceManager.java     |    7 +
 .../query/reader/chunk/DiskAlignedChunkLoader.java |   16 +
 .../reader/series/AlignedSeriesBitmapReader.java   |  174 +++
 .../db/query/reader/series/SeriesBitmapReader.java | 1474 ++++++++++++++++++++
 .../reader/series/SeriesRawDataPrefetchReader.java |  169 +++
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   17 +
 .../read/reader/chunk/AlignedChunkReader.java      |  129 ++
 .../reader/page/AlignedPagePrefetchReader.java     |  148 ++
 .../read/reader/page/ValuePageBitmapReader.java    |  237 ++++
 16 files changed, 2914 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b195d3df55..1ac6569d55 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -374,13 +374,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space copaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space copaction for unsequence files */
   private boolean enableUnseqSpaceCompaction = true;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private boolean enableCrossSpaceCompaction = false;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -409,6 +409,12 @@ public class IoTDBConfig {
   /** The target tsfile size in compaction, 1 GB by default */
   private long targetCompactionFileSize = 1073741824L;
 
+  private int maxCompactionLevel = 100;
+
+  private int maxFileNumInLevel = 50;
+
+  private String compactionSelectFileMethod = "mcc";
+
   /** The target chunk size in compaction. */
   private long targetChunkSize = 1048576L;
 
@@ -443,10 +449,10 @@ public class IoTDBConfig {
   private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
 
   /** The interval of compaction task schedulation in each virtual storage group. The unit is ms. */
-  private long compactionScheduleIntervalInMs = 60_000L;
+  private long compactionScheduleIntervalInMs = 1_000L;
 
   /** The interval of compaction task submission from queue in CompactionTaskMananger */
-  private long compactionSubmissionIntervalInMs = 60_000L;
+  private long compactionSubmissionIntervalInMs = 1_000L;
 
   /**
    * The number of sub compaction threads to be set up to perform compaction. Currently only works
@@ -2547,10 +2553,34 @@ public class IoTDBConfig {
     return targetCompactionFileSize;
   }
 
+  public int getMaxCompactionLevel() {
+    return maxCompactionLevel;
+  }
+
+  public int getMaxFileNumInLevel() {
+    return maxFileNumInLevel;
+  }
+
+  public String getCompactionSelectFileMethod() {
+    return compactionSelectFileMethod;
+  }
+
   public void setTargetCompactionFileSize(long targetCompactionFileSize) {
     this.targetCompactionFileSize = targetCompactionFileSize;
   }
 
+  public void setMaxCompactionLevel(int maxCompactionLevel) {
+    this.maxCompactionLevel = maxCompactionLevel;
+  }
+
+  public void setMaxFileNumInLevel(int maxFileNumInLevel) {
+    this.maxFileNumInLevel = maxFileNumInLevel;
+  }
+
+  public void setCompactionSelectFileMethod(String compactionSelectFileMethod) {
+    this.compactionSelectFileMethod = compactionSelectFileMethod;
+  }
+
   public long getTargetChunkSize() {
     return targetChunkSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2616bbb15e..fb8142d406 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -553,6 +553,17 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "concurrent_compaction_thread",
                 Integer.toString(conf.getConcurrentCompactionThread()))));
+    conf.setMaxCompactionLevel(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_compaction_level", Integer.toString(conf.getMaxCompactionLevel()))));
+    conf.setMaxFileNumInLevel(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_file_num_in_level", Integer.toString(conf.getMaxFileNumInLevel()))));
+    conf.setCompactionSelectFileMethod(
+        properties.getProperty(
+            "compaction_select_file_method", conf.getCompactionSelectFileMethod()));
     conf.setTargetCompactionFileSize(
         Long.parseLong(
             properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index 62dc5f424e..0fefe484d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -46,6 +46,7 @@ public class CompactionScheduler {
     if (!tsFileManager.isAllowCompaction()) {
       return;
     }
+
     tryToSubmitCrossSpaceCompactionTask(
         tsFileManager.getStorageGroupName(),
         tsFileManager.getVirtualStorageGroup(),
@@ -53,6 +54,7 @@ public class CompactionScheduler {
         timePartition,
         tsFileManager,
         new CrossSpaceCompactionTaskFactory());
+
     tryToSubmitInnerSpaceCompactionTask(
         tsFileManager.getStorageGroupName(),
         tsFileManager.getVirtualStorageGroup(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index ee4a45d9db..9c453ffbd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -24,22 +24,34 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionSelector;
 import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
+import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataPrefetchReader;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * SizeTieredCompactionSelector selects files to be compacted based on the size of files. The
@@ -86,9 +98,17 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
         new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
     try {
       int maxLevel = searchMaxFileLevel();
-      for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
-        if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
-          break;
+      for (int currentLevel = 0;
+          currentLevel <= maxLevel && currentLevel <= config.getMaxCompactionLevel();
+          currentLevel++) {
+        if (this.sequence) {
+          if (!selectLevelTaskSeq(currentLevel, taskPriorityQueue)) {
+            break;
+          }
+        } else {
+          if (!selectLevelTaskUnseq(currentLevel, taskPriorityQueue)) {
+            break;
+          }
         }
       }
       while (taskPriorityQueue.size() > 0) {
@@ -113,19 +133,235 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
    * @return return whether to continue the search to higher levels
    * @throws IOException
    */
-  private boolean selectLevelTask(
+  private boolean selectLevelTaskSeq(
+      int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+      throws IOException {
+    boolean shouldContinueToSearch = true;
+    List<TsFileResource> selectedFileList = new ArrayList<>();
+    long selectedFileSize = 0L;
+    long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+    for (TsFileResource currentFile : tsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      if (currentName.getInnerCompactionCnt() != level
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
+        selectedFileList.clear();
+        selectedFileSize = 0L;
+        continue;
+      }
+      LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize());
+      selectedFileList.add(currentFile);
+      selectedFileSize += currentFile.getTsFileSize();
+      LOGGER.debug(
+          "Add tsfile {}, current select file num is {}, size is {}",
+          currentFile,
+          selectedFileList.size(),
+          selectedFileSize);
+      // if the file size or file num reach threshold
+      if (selectedFileSize >= targetCompactionFileSize
+          || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+        // submit the task
+        if (selectedFileList.size() > 1) {
+          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+        }
+        selectedFileList = new ArrayList<>();
+        selectedFileSize = 0L;
+        shouldContinueToSearch = false;
+      }
+    }
+    return shouldContinueToSearch;
+  }
+
+  private boolean selectLevelTaskUnseq(
+      int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+      throws IOException {
+    if (tsFileResources.size() <= 1) {
+      return true;
+    }
+    if (config.getMaxInnerCompactionCandidateFileNum() == 1) {
+      return false;
+    }
+    String compactionSelectFileMethod = config.getCompactionSelectFileMethod();
+
+    if (compactionSelectFileMethod.equals("oldest")) {
+      return selectLevelTaskUnseqOldest(level, taskPriorityQueue);
+    } else if (compactionSelectFileMethod.equals("round")) {
+      return selectLevelTaskUnseqRound(level, taskPriorityQueue);
+    }
+
+    /* greedy */
+    long startTime = System.currentTimeMillis();
+    boolean shouldContinueToSearch = true;
+    List<TsFileResource> selectedFileList = new ArrayList<>();
+    ArrayList<Integer> selectedFileIdx = new ArrayList<>();
+    long selectedFileSize = 0L;
+    long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+    ArrayList<ArrayList<Long>> timeLists = new ArrayList<>();
+    ArrayList<ArrayList<ArrayList<Boolean>>> allBitmapLists = new ArrayList<>();
+    ArrayList<ArrayList<String>> schemaLists = new ArrayList<>();
+    ArrayList<TsFileResource> curLevelTsFileResources = new ArrayList<>();
+
+    for (TsFileResource currentFile : tsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      if (currentName.getInnerCompactionCnt() != level
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
+        continue;
+      } else {
+        if (currentFile.getTsFileSize() > 0) {
+          curLevelTsFileResources.add(currentFile);
+        }
+      }
+    }
+
+    if (curLevelTsFileResources.size() <= config.getMaxFileNumInLevel()) {
+      return shouldContinueToSearch;
+    }
+
+    for (TsFileResource currentFile : curLevelTsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      // read all files
+      if (!this.sequence) {
+        ArrayList<Long> timeList = new ArrayList<>();
+        ArrayList<ArrayList<Boolean>> bitmapLists = new ArrayList<>();
+        ArrayList<String> schemaList = new ArrayList<>();
+        readMultiFileDataWithoutValues(currentFile, timeList, bitmapLists, schemaList);
+        timeLists.add(timeList);
+        allBitmapLists.add(bitmapLists);
+        schemaLists.add(schemaList);
+      }
+    }
+    long readFileStopTime = System.currentTimeMillis();
+    double readFileTime = (readFileStopTime - startTime) / 1000.0d;
+
+    // generate new task queue for compaction
+    // greedy add files
+    int i;
+    int maxOverlapFinal = -1;
+
+    int selectedNum = config.getMaxInnerCompactionCandidateFileNum();
+
+    for (i = 0; i < curLevelTsFileResources.size() - selectedNum; i++) {
+      ArrayList<Integer> curSelectedFileIdx = new ArrayList<>();
+      List<TsFileResource> curSelectedFileList = new ArrayList<>();
+      long curSelectedFileSize = 0L;
+      curSelectedFileList.add(curLevelTsFileResources.get(i));
+      curSelectedFileIdx.add(i);
+      curSelectedFileSize += curLevelTsFileResources.get(i).getTsFileSize();
+      int totalOverlap = 0;
+      while (curSelectedFileList.size() < timeLists.size()) {
+        int maxOverlapIdx = -1;
+        int maxOverlap = -1;
+        for (int j = curSelectedFileIdx.get(curSelectedFileList.size() - 1) + 1;
+            j < timeLists.size();
+            j++) {
+          int curOverlap = -1;
+          for (int idx : curSelectedFileIdx) {
+            int tmpOverlap =
+                computeOverlapWithBitmap(
+                    timeLists.get(idx),
+                    timeLists.get(j),
+                    allBitmapLists.get(idx),
+                    allBitmapLists.get(j),
+                    schemaLists.get(idx),
+                    schemaLists.get(j));
+            if (tmpOverlap > curOverlap) {
+              curOverlap = tmpOverlap;
+            }
+          }
+          if (curOverlap > maxOverlap) {
+            maxOverlap = curOverlap;
+            maxOverlapIdx = j;
+          }
+        }
+        if (maxOverlap == -1 || maxOverlapIdx == -1) {
+          break;
+        }
+        TsFileResource currentFile = curLevelTsFileResources.get(maxOverlapIdx);
+        curSelectedFileIdx.add(maxOverlapIdx);
+        curSelectedFileList.add(currentFile);
+        curSelectedFileSize += currentFile.getTsFileSize();
+        totalOverlap += maxOverlap;
+
+        if (curSelectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+          // check overlap
+          if (totalOverlap > maxOverlapFinal) {
+            maxOverlapFinal = totalOverlap;
+            selectedFileSize = curSelectedFileSize;
+            selectedFileIdx = new ArrayList<>();
+            for (int tmpIdx : curSelectedFileIdx) {
+              selectedFileIdx.add(tmpIdx);
+            }
+            selectedFileList = new ArrayList<>();
+            for (TsFileResource tsfile : curSelectedFileList) {
+              selectedFileList.add(tsfile);
+            }
+          }
+          break;
+        }
+      }
+    }
+
+    if (selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+      // submit the task
+      if (selectedFileList.size() > 1) {
+        taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+        shouldContinueToSearch = false;
+      }
+    }
+    long submitCompactionStopTime = System.currentTimeMillis();
+    double submitCompactionTime = (submitCompactionStopTime - readFileStopTime) / 1000.0d;
+    return shouldContinueToSearch;
+  }
+
+  private boolean selectLevelTaskUnseqRound(
       int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
       throws IOException {
+    long startTime = System.currentTimeMillis();
     boolean shouldContinueToSearch = true;
     List<TsFileResource> selectedFileList = new ArrayList<>();
     long selectedFileSize = 0L;
     long targetCompactionFileSize = config.getTargetCompactionFileSize();
 
+    int levelFileNum = tsFileResources.size();
     for (TsFileResource currentFile : tsFileResources) {
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
       if (currentName.getInnerCompactionCnt() != level
-          || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
+        levelFileNum--;
+      }
+    }
+    if (levelFileNum < config.getMaxFileNumInLevel()) {
+      return shouldContinueToSearch;
+    }
+
+    ArrayList<TsFileResource> randomTsFileResources = new ArrayList<>();
+    ArrayList<Integer> idxList = new ArrayList<>();
+    for (int i = 0; i < tsFileResources.size(); ++i) {
+      idxList.add(i);
+    }
+    Collections.shuffle(idxList);
+    for (int i = 0; i < tsFileResources.size(); ++i) {
+      randomTsFileResources.add(tsFileResources.get(idxList.get(i)));
+    }
+
+    for (TsFileResource currentFile : randomTsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      if (currentName.getInnerCompactionCnt() != level
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
@@ -148,11 +384,75 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
         selectedFileList = new ArrayList<>();
         selectedFileSize = 0L;
         shouldContinueToSearch = false;
+        break;
       }
     }
     return shouldContinueToSearch;
   }
 
+  private boolean selectLevelTaskUnseqOldest(
+      int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+      throws IOException {
+    long startTime = System.currentTimeMillis();
+    long submitCompactionStopTime = System.currentTimeMillis();
+    double submitCompactionTime = (submitCompactionStopTime - startTime) / 1000.0d;
+
+    boolean shouldContinueToSearch = true;
+    List<TsFileResource> selectedFileList = new ArrayList<>();
+    long selectedFileSize = 0L;
+    long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+    int levelFileNum = tsFileResources.size();
+    for (TsFileResource currentFile : tsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      if (currentName.getInnerCompactionCnt() != level
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
+        levelFileNum--;
+      }
+    }
+    if (levelFileNum < config.getMaxFileNumInLevel()) {
+      return shouldContinueToSearch;
+    }
+
+    for (TsFileResource currentFile : tsFileResources) {
+      TsFileNameGenerator.TsFileName currentName =
+          TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+      if (currentName.getInnerCompactionCnt() != level
+          || currentFile.isCompactionCandidate()
+          || currentFile.isCompacting()
+          || !currentFile.isClosed()) {
+        selectedFileList.clear();
+        selectedFileSize = 0L;
+        continue;
+      }
+      LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize());
+      selectedFileList.add(currentFile);
+      selectedFileSize += currentFile.getTsFileSize();
+      LOGGER.debug(
+          "Add tsfile {}, current select file num is {}, size is {}",
+          currentFile,
+          selectedFileList.size(),
+          selectedFileSize);
+      // if the file size or file num reach threshold
+      if (selectedFileSize >= targetCompactionFileSize
+          || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+        // submit the task
+        if (selectedFileList.size() > 1) {
+          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+        }
+        selectedFileList = new ArrayList<>();
+        selectedFileSize = 0L;
+        shouldContinueToSearch = false;
+        break;
+      }
+    }
+
+    return shouldContinueToSearch;
+  }
+
   private int searchMaxFileLevel() throws IOException {
     int maxLevel = -1;
     Iterator<TsFileResource> iterator = tsFileResources.iterator();
@@ -180,6 +480,154 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
     return CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
   }
 
+  private void readMultiFileDataWithoutValues(
+      TsFileResource tsFileResource,
+      ArrayList<Long> timeList,
+      ArrayList<ArrayList<Boolean>> bitmapLists,
+      ArrayList<String> schemaList) {
+    try {
+      List<TsFileResource> unseqFileResources = new ArrayList<>();
+      unseqFileResources.add(tsFileResource);
+      long queryId = QueryResourceManager.getInstance().assignCompactionPrefetchQueryId();
+      QueryContext queryContext = new QueryContext(queryId);
+      QueryDataSource queryDataSource = new QueryDataSource(new ArrayList<>(), unseqFileResources);
+      QueryResourceManager.getInstance()
+          .getQueryFileManager()
+          .addUsedFilesForQuery(queryId, queryDataSource);
+      MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(unseqFileResources);
+
+      while (deviceIterator.hasNextDevice()) {
+        Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
+        String device = deviceInfo.left;
+        boolean isAligned = deviceInfo.right;
+        QueryUtils.fillOrderIndexes(queryDataSource, device, true);
+
+        if (isAligned) {
+          Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice();
+          List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
+          if (measurementSchemas.isEmpty()) {
+            return;
+          }
+          List<String> existedMeasurements =
+              measurementSchemas.stream()
+                  .map(IMeasurementSchema::getMeasurementId)
+                  .collect(Collectors.toList());
+          for (String schema : existedMeasurements) {
+            schemaList.add(schema);
+          }
+          IBatchReader dataBatchReader =
+              //              constructReader(
+              constructPrefetchReader(
+                  device,
+                  existedMeasurements,
+                  measurementSchemas,
+                  schemaMap.keySet(),
+                  queryContext,
+                  queryDataSource,
+                  true);
+
+          for (int i = 0; i < existedMeasurements.size(); i++) {
+            bitmapLists.add(new ArrayList<>());
+          }
+
+          if (dataBatchReader.hasNextBatch()) {
+            while (dataBatchReader.hasNextBatch()) {
+              BatchData batchData = dataBatchReader.nextBatch();
+              while (batchData.hasCurrent()) {
+                long time = batchData.currentTime();
+                timeList.add(time);
+                TsPrimitiveType[] value = (TsPrimitiveType[]) batchData.currentValue();
+                for (int j = 0; j < value.length; j++) {
+                  if (value[j] == null) {
+                    bitmapLists.get(j).add(false);
+                  } else {
+                    bitmapLists.get(j).add(true);
+                  }
+                }
+                batchData.next();
+              }
+            }
+          }
+        }
+        tsFileResource.readUnlock();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Exception occurs while reading multiple file data", e);
+    }
+  }
+
+  private int computeOverlapWithBitmap(
+      ArrayList<Long> timeList1,
+      ArrayList<Long> timeList2,
+      ArrayList<ArrayList<Boolean>> bitmap1,
+      ArrayList<ArrayList<Boolean>> bitmap2,
+      ArrayList<String> schemaList1,
+      ArrayList<String> schemaList2) {
+    int p1 = 0;
+    int p2 = 0;
+    int overlap = 0;
+    ArrayList<Pair<Integer, Integer>> schemaPairs = new ArrayList<>();
+    for (int i = 0; i < schemaList1.size(); i++) {
+      for (int j = 0; j < schemaList2.size(); j++) {
+        if (schemaList1.get(i).equals(schemaList2.get(j))) {
+          schemaPairs.add(new Pair<>(i, j));
+        }
+      }
+    }
+    int overlapOfRow = schemaPairs.size();
+    int updateCheck = -1;
+    while (p1 < timeList1.size() && p2 < timeList2.size()) {
+      if (timeList1.get(p1) < timeList2.get(p2)) {
+        p1++;
+      } else {
+        if (timeList1.get(p1) > timeList2.get(p2)) {
+          p2++;
+        } else {
+          if (updateCheck == 0) {
+            // insert
+            overlap += 1;
+          } else if (updateCheck == 1) {
+            // update
+            overlap += overlapOfRow + 1;
+          } else {
+            Pair<Integer, Integer> schemaPair = schemaPairs.get(0);
+            if (bitmap1.get(schemaPair.left).get(p1) && bitmap2.get(schemaPair.right).get(p2)) {
+              updateCheck = 1;
+            } else {
+              updateCheck = 0;
+            }
+          }
+
+          p1++;
+          p2++;
+        }
+      }
+    }
+    return overlap;
+  }
+
+  public static IBatchReader constructPrefetchReader(
+      String deviceId,
+      List<String> measurementIds,
+      List<IMeasurementSchema> measurementSchemas,
+      Set<String> allSensors,
+      QueryContext queryContext,
+      QueryDataSource queryDataSource,
+      boolean isAlign)
+      throws IllegalPathException {
+    PartialPath seriesPath;
+    TSDataType tsDataType;
+    if (isAlign) {
+      seriesPath = new AlignedPath(deviceId, measurementIds, measurementSchemas);
+      tsDataType = TSDataType.VECTOR;
+    } else {
+      seriesPath = new MeasurementPath(deviceId, measurementIds.get(0), measurementSchemas.get(0));
+      tsDataType = measurementSchemas.get(0).getType();
+    }
+    return new SeriesRawDataPrefetchReader(
+        seriesPath, allSensors, tsDataType, queryContext, queryDataSource, null, null, null, true);
+  }
+
   private class SizeTieredCompactionTaskComparator
       implements Comparator<Pair<List<TsFileResource>, Long>> {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index aba0a9adec..87fce04c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -90,7 +90,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
     long startTime = System.currentTimeMillis();
     // get resource of target file
     String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
-    // Here is tmpTargetFile, which is xxx.target
     TsFileResource targetTsFileResource =
         TsFileNameGenerator.getInnerCompactionTargetFileResource(
             selectedTsFileResourceList, sequence);
@@ -196,13 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
         FileUtils.delete(logFile);
       }
 
-      if (targetTsFileResource.isDeleted()) {
-        // target resource is empty after compaction, then delete it
-        targetTsFileResource.remove();
-      } else {
-        // set target resource to CLOSED, so that it can be selected to compact
-        targetTsFileResource.setStatus(TsFileResourceStatus.CLOSED);
-      }
     } catch (Throwable throwable) {
       LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
       if (sizeTieredCompactionLogger != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 9ae5b7db1e..51fa6bca97 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -37,7 +37,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesBitmapReader;
 import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
+import org.apache.iotdb.db.query.reader.series.SeriesBitmapReader;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -281,6 +283,28 @@ public class AlignedPath extends PartialPath {
         ascending);
   }
 
+  @Override
+  public SeriesBitmapReader createSeriesBitmapReader(
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    return new AlignedSeriesBitmapReader(
+        this,
+        allSensors,
+        dataType,
+        context,
+        dataSource,
+        timeFilter,
+        valueFilter,
+        fileFilter,
+        ascending);
+  }
+
   @Override
   @TestOnly
   public AlignedSeriesReader createSeriesReader(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index b55cacd6fc..9cce9e516b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.fill.LastPointReader;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesBitmapReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -504,6 +505,18 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
     throw new UnsupportedOperationException("Should call exact sub class!");
   }
 
+  public SeriesBitmapReader createSeriesBitmapReader(
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    throw new UnsupportedOperationException("Should call exact sub class!");
+  }
+
   @TestOnly
   public SeriesReader createSeriesReader(
       Set<String> allSensors,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index a5ecb90dbc..be9dc914b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -103,6 +103,13 @@ public class QueryResourceManager {
     return queryId;
   }
 
+  public long assignCompactionPrefetchQueryId() {
+    long threadNum = 10;
+    long queryId = Long.MIN_VALUE + threadNum;
+    filePathsManager.addQueryId(queryId);
+    return queryId;
+  }
+
   /**
    * register temporary file generated by external sort for resource release.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
index 3487682a2e..d8ae569588 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
@@ -64,4 +64,20 @@ public class DiskAlignedChunkLoader implements IChunkLoader {
     }
     return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter);
   }
+
+  public IChunkReader getChunkPrefetchReader(IChunkMetadata chunkMetaData, Filter timeFilter)
+      throws IOException {
+    AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData;
+    Chunk timeChunk =
+        ChunkCache.getInstance()
+            .get((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(), debug);
+    List<Chunk> valueChunkList = new ArrayList<>();
+    for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
+      valueChunkList.add(
+          valueChunkMetadata == null
+              ? null
+              : ChunkCache.getInstance().get((ChunkMetadata) valueChunkMetadata, debug));
+    }
+    return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter, true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java
new file mode 100644
index 0000000000..4cdf3dcb73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public class AlignedSeriesBitmapReader extends SeriesBitmapReader {
+
+  public AlignedSeriesBitmapReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    super(
+        seriesPath,
+        allSensors,
+        dataType,
+        context,
+        dataSource,
+        timeFilter,
+        valueFilter,
+        fileFilter,
+        ascending);
+  }
+
+  @TestOnly
+  public AlignedSeriesBitmapReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    super(
+        seriesPath,
+        allSensors,
+        dataType,
+        context,
+        seqFileResource,
+        unseqFileResource,
+        timeFilter,
+        valueFilter,
+        ascending);
+  }
+
+  @Override
+  protected PriorityMergeReader getPriorityMergeReader() {
+    return new AlignedPriorityMergeReader();
+  }
+
+  @Override
+  protected DescPriorityMergeReader getDescPriorityMergeReader() {
+    return new AlignedDescPriorityMergeReader();
+  }
+
+  @Override
+  protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, (AlignedPath) seriesPath, context, filter);
+  }
+
+  @Override
+  /**
+   * This method should be called after hasNextChunk() until no next page, make sure that all
+   * overlapped pages are consumed
+   */
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  boolean hasNextPage() throws IOException {
+    if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+      return false;
+    }
+
+    /*
+     * has overlapped data before
+     */
+    if (hasCachedNextOverlappedPage) {
+      return true;
+    } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+      if (hasNextOverlappedPage()) {
+        cachedBatchData = nextOverlappedPage();
+        if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+          hasCachedNextOverlappedPage = true;
+          return true;
+        }
+      }
+    }
+
+    if (firstPageReader != null) {
+      return true;
+    }
+
+    /*
+     * construct first page reader
+     */
+    if (firstChunkMetadata != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToPageReaders(
+          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first chunk metadata is already unpacked, consume cached pages
+       */
+      initFirstPageReader();
+    }
+
+    if (isExistOverlappedPage()) {
+      return true;
+    }
+
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+    // readers
+    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+      initFirstPageReader();
+
+      if (isExistOverlappedPage()) {
+        return true;
+      }
+    }
+    return firstPageReader != null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java
new file mode 100644
index 0000000000..892f0ce23f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
+import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+
+public class SeriesBitmapReader {
+
+  // inner class of SeriesReader for order purpose
+  protected TimeOrderUtils orderUtils;
+
+  protected final PartialPath seriesPath;
+
+  // all the sensors in this device;
+  protected final Set<String> allSensors;
+  protected final TSDataType dataType;
+  protected final QueryContext context;
+
+  /*
+   * There is at most one is not null between timeFilter and valueFilter
+   *
+   * timeFilter is pushed down to all pages (seq, unseq) without correctness problem
+   *
+   * valueFilter is pushed down to non-overlapped page only
+   */
+  protected final Filter timeFilter;
+  protected final Filter valueFilter;
+  protected final TsFileFilter fileFilter;
+
+  protected final QueryDataSource dataSource;
+
+  /*
+   * file index
+   */
+  protected int curSeqFileIndex;
+  protected int curUnseqFileIndex;
+
+  /*
+   * TimeSeriesMetadata cache
+   */
+  protected ITimeSeriesMetadata firstTimeSeriesMetadata;
+  protected final List<ITimeSeriesMetadata> seqTimeSeriesMetadata = new LinkedList<>();
+  protected final PriorityQueue<ITimeSeriesMetadata> unSeqTimeSeriesMetadata;
+
+  /*
+   * chunk cache
+   */
+  protected IChunkMetadata firstChunkMetadata;
+  protected final PriorityQueue<IChunkMetadata> cachedChunkMetadata;
+
+  /*
+   * page cache
+   */
+  protected VersionPageReader firstPageReader;
+  protected final List<VersionPageReader> seqPageReaders = new LinkedList<>();
+  protected final PriorityQueue<VersionPageReader> unSeqPageReaders;
+
+  /*
+   * point cache
+   */
+  protected final PriorityMergeReader mergeReader;
+
+  /*
+   * result cache
+   */
+  protected boolean hasCachedNextOverlappedPage;
+  protected BatchData cachedBatchData;
+
+  /**
+   * @param seriesPath For querying aligned series, the seriesPath should be AlignedPath. All
+   *     selected series belonging to one aligned device should be all in this one AlignedPath's
+   *     measurementList.
+   * @param allSensors For querying aligned series, allSensors are not used.
+   */
+  public SeriesBitmapReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    this.seriesPath = IDTable.translateQueryPath(seriesPath);
+    this.allSensors = allSensors;
+    this.dataType = dataType;
+    this.context = context;
+    this.dataSource = dataSource;
+    this.timeFilter = timeFilter;
+    this.valueFilter = valueFilter;
+    this.fileFilter = fileFilter;
+    if (ascending) {
+      this.orderUtils = new AscTimeOrderUtils();
+      mergeReader = getPriorityMergeReader();
+      this.curSeqFileIndex = 0;
+      this.curUnseqFileIndex = 0;
+    } else {
+      this.orderUtils = new DescTimeOrderUtils();
+      mergeReader = getDescPriorityMergeReader();
+      this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+      this.curUnseqFileIndex = 0;
+    }
+
+    unSeqTimeSeriesMetadata =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
+    cachedChunkMetadata =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
+    unSeqPageReaders =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+  }
+
+  @TestOnly
+  @SuppressWarnings("squid:S107")
+  public SeriesBitmapReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    this.seriesPath = IDTable.translateQueryPath(seriesPath);
+    this.allSensors = allSensors;
+    this.dataType = dataType;
+    this.context = context;
+    this.dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
+    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), ascending);
+    this.timeFilter = timeFilter;
+    this.valueFilter = valueFilter;
+    this.fileFilter = null;
+    if (ascending) {
+      this.orderUtils = new AscTimeOrderUtils();
+      mergeReader = getPriorityMergeReader();
+      this.curSeqFileIndex = 0;
+      this.curUnseqFileIndex = 0;
+    } else {
+      this.orderUtils = new DescTimeOrderUtils();
+      mergeReader = getDescPriorityMergeReader();
+      this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+      this.curUnseqFileIndex = 0;
+    }
+
+    unSeqTimeSeriesMetadata =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
+    cachedChunkMetadata =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
+    unSeqPageReaders =
+        new PriorityQueue<>(
+            orderUtils.comparingLong(
+                versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+  }
+
+  protected PriorityMergeReader getPriorityMergeReader() {
+    return new PriorityMergeReader();
+  }
+
+  protected DescPriorityMergeReader getDescPriorityMergeReader() {
+    return new DescPriorityMergeReader();
+  }
+
+  public boolean isEmpty() throws IOException {
+    return !(hasNextPage() || hasNextChunk() || hasNextFile());
+  }
+
+  boolean hasNextFile() throws IOException {
+    if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+      return false;
+    }
+
+    if (!unSeqPageReaders.isEmpty()
+        || firstPageReader != null
+        || mergeReader.hasNextTimeValuePair()) {
+      throw new IOException(
+          "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+              + unSeqPageReaders.isEmpty()
+              + " firstPageReader != null is "
+              + (firstPageReader != null)
+              + " mergeReader.hasNextTimeValuePair() = "
+              + mergeReader.hasNextTimeValuePair());
+    }
+
+    if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) {
+      throw new IOException("all cached chunks should be consumed first");
+    }
+
+    if (firstTimeSeriesMetadata != null) {
+      return true;
+    }
+
+    while (firstTimeSeriesMetadata == null
+        && (orderUtils.hasNextSeqResource()
+            || orderUtils.hasNextUnseqResource()
+            || !seqTimeSeriesMetadata.isEmpty()
+            || !unSeqTimeSeriesMetadata.isEmpty())) {
+      // init first time series metadata whose startTime is minimum
+      tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
+    }
+
+    return firstTimeSeriesMetadata != null;
+  }
+
+  boolean isFileOverlapped() throws IOException {
+    if (firstTimeSeriesMetadata == null) {
+      throw new IOException("no first file");
+    }
+
+    Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics();
+    return !seqTimeSeriesMetadata.isEmpty()
+            && orderUtils.isOverlapped(fileStatistics, seqTimeSeriesMetadata.get(0).getStatistics())
+        || !unSeqTimeSeriesMetadata.isEmpty()
+            && orderUtils.isOverlapped(
+                fileStatistics, unSeqTimeSeriesMetadata.peek().getStatistics());
+  }
+
+  Statistics currentFileStatistics() {
+    return firstTimeSeriesMetadata.getStatistics();
+  }
+
+  Statistics currentFileStatistics(int index) throws IOException {
+    if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
+      throw new IOException("Can only get statistics by index from alignedTimeSeriesMetaData");
+    }
+    return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
+  }
+
+  public Statistics currentFileTimeStatistics() throws IOException {
+    if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
+      throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
+    }
+    return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics();
+  }
+
+  boolean currentFileModified() throws IOException {
+    if (firstTimeSeriesMetadata == null) {
+      throw new IOException("no first file");
+    }
+    return firstTimeSeriesMetadata.isModified();
+  }
+
+  void skipCurrentFile() {
+    firstTimeSeriesMetadata = null;
+  }
+
+  /**
+   * This method should be called after hasNextFile() until no next chunk, make sure that all
+   * overlapped chunks are consumed
+   */
+  boolean hasNextChunk() throws IOException {
+    if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+      return false;
+    }
+
+    if (!unSeqPageReaders.isEmpty()
+        || firstPageReader != null
+        || mergeReader.hasNextTimeValuePair()) {
+      throw new IOException(
+          "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+              + unSeqPageReaders.isEmpty()
+              + " firstPageReader != null is "
+              + (firstPageReader != null)
+              + " mergeReader.hasNextTimeValuePair() = "
+              + mergeReader.hasNextTimeValuePair());
+    }
+
+    if (firstChunkMetadata != null) {
+      return true;
+    }
+
+    while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
+      initFirstChunkMetadata();
+    }
+    return firstChunkMetadata != null;
+  }
+
+  /** construct first chunk metadata */
+  private void initFirstChunkMetadata() throws IOException {
+    if (firstTimeSeriesMetadata != null) {
+      /*
+       * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata
+       */
+      unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+          orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()));
+      unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+          orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first time series metadata is already unpacked, consume cached ChunkMetadata
+       */
+      while (!cachedChunkMetadata.isEmpty()) {
+        firstChunkMetadata = cachedChunkMetadata.peek();
+        unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+            orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
+        unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+            orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+        if (firstChunkMetadata.equals(cachedChunkMetadata.peek())) {
+          firstChunkMetadata = cachedChunkMetadata.poll();
+          break;
+        }
+      }
+    }
+    if (valueFilter != null
+        && firstChunkMetadata != null
+        && !isChunkOverlapped()
+        && !firstChunkMetadata.isModified()
+        && !valueFilter.satisfy(firstChunkMetadata.getStatistics())) {
+      skipCurrentChunk();
+    }
+  }
+
+  private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+      long endpointTime, boolean init) throws IOException {
+    while (!seqTimeSeriesMetadata.isEmpty()
+        && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
+      unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+    }
+    while (!unSeqTimeSeriesMetadata.isEmpty()
+        && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
+      unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+    }
+
+    if (firstTimeSeriesMetadata != null
+        && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) {
+      unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata);
+      firstTimeSeriesMetadata = null;
+    }
+
+    if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) {
+      firstChunkMetadata = cachedChunkMetadata.poll();
+    }
+  }
+
+  protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata)
+      throws IOException {
+    List<IChunkMetadata> chunkMetadataList =
+        FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
+    chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
+
+    // for tracing: try to calculate the number of chunk and time-value points in chunk
+    if (context.isEnableTracing()) {
+      long totalChunkPointsNum =
+          chunkMetadataList.stream()
+              .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount())
+              .sum();
+      TracingManager.getInstance()
+          .addChunkInfo(
+              context.getQueryId(),
+              chunkMetadataList.size(),
+              totalChunkPointsNum,
+              timeSeriesMetadata.isSeq());
+    }
+
+    cachedChunkMetadata.addAll(chunkMetadataList);
+  }
+
+  boolean isChunkOverlapped() throws IOException {
+    if (firstChunkMetadata == null) {
+      throw new IOException("no first chunk");
+    }
+
+    Statistics chunkStatistics = firstChunkMetadata.getStatistics();
+    return !cachedChunkMetadata.isEmpty()
+        && orderUtils.isOverlapped(chunkStatistics, cachedChunkMetadata.peek().getStatistics());
+  }
+
+  Statistics currentChunkStatistics() {
+    return firstChunkMetadata.getStatistics();
+  }
+
+  Statistics currentChunkStatistics(int index) throws IOException {
+    if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
+      throw new IOException("Can only get statistics by index from vectorChunkMetaData");
+    }
+    return ((AlignedChunkMetadata) firstChunkMetadata).getStatistics(index);
+  }
+
+  Statistics currentChunkTimeStatistics() throws IOException {
+    if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
+      throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
+    }
+    return ((AlignedChunkMetadata) firstChunkMetadata).getTimeStatistics();
+  }
+
+  boolean currentChunkModified() throws IOException {
+    if (firstChunkMetadata == null) {
+      throw new IOException("no first chunk");
+    }
+    return firstChunkMetadata.isModified();
+  }
+
+  void skipCurrentChunk() {
+    firstChunkMetadata = null;
+  }
+
+  /**
+   * This method should be called after hasNextChunk() until no next page, make sure that all
+   * overlapped pages are consumed
+   */
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  boolean hasNextPage() throws IOException {
+    if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+      return false;
+    }
+
+    /*
+     * has overlapped data before
+     */
+    if (hasCachedNextOverlappedPage) {
+      return true;
+    } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+      if (hasNextOverlappedPage()) {
+        cachedBatchData = nextOverlappedPage();
+        if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+          hasCachedNextOverlappedPage = true;
+          return true;
+        }
+      }
+    }
+
+    if (firstPageReader != null) {
+      return true;
+    }
+
+    /*
+     * construct first page reader
+     */
+    if (firstChunkMetadata != null) {
+      /*
+       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+       */
+      unpackAllOverlappedChunkMetadataToPageReaders(
+          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+    } else {
+      /*
+       * first chunk metadata is already unpacked, consume cached pages
+       */
+      initFirstPageReader();
+    }
+
+    if (isExistOverlappedPage()) {
+      return true;
+    }
+
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+    // readers
+    while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+      initFirstPageReader();
+
+      if (isExistOverlappedPage()) {
+        return true;
+      }
+    }
+    return firstPageReader != null;
+  }
+
+  public boolean isExistOverlappedPage() throws IOException {
+    if (firstPageOverlapped()) {
+      /*
+       * next page is overlapped, read overlapped data and cache it
+       */
+      if (hasNextOverlappedPage()) {
+        cachedBatchData = nextOverlappedPage();
+        if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+          hasCachedNextOverlappedPage = true;
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  public boolean firstPageOverlapped() throws IOException {
+    if (firstPageReader == null) {
+      return false;
+    }
+
+    long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+    unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
+    unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
+    unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
+
+    return (!seqPageReaders.isEmpty()
+            && orderUtils.isOverlapped(
+                firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics()))
+        || (!unSeqPageReaders.isEmpty()
+                && orderUtils.isOverlapped(
+                    firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics())
+            || (mergeReader.hasNextTimeValuePair()
+                && orderUtils.isOverlapped(
+                    mergeReader.currentTimeValuePair().getTimestamp(),
+                    firstPageReader.getStatistics())));
+  }
+
+  public void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init)
+      throws IOException {
+    if (firstChunkMetadata != null
+        && orderUtils.isOverlapped(endpointTime, firstChunkMetadata.getStatistics())) {
+      unpackOneChunkMetaData(firstChunkMetadata);
+      firstChunkMetadata = null;
+    }
+    // In case unpacking too many sequence chunks
+    boolean hasMeetSeq = false;
+    while (!cachedChunkMetadata.isEmpty()
+        && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.peek().getStatistics())) {
+      if (cachedChunkMetadata.peek().isSeq() && hasMeetSeq) {
+        break;
+      } else if (cachedChunkMetadata.peek().isSeq()) {
+        hasMeetSeq = true;
+      }
+      unpackOneChunkMetaData(cachedChunkMetadata.poll());
+    }
+    if (init
+        && firstPageReader == null
+        && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+      initFirstPageReader();
+    }
+  }
+
+  private void unpackOneChunkMetaData(IChunkMetadata chunkMetaData) throws IOException {
+    List<IPageReader> pageReaderList =
+        FileLoaderUtils.loadPagePrefetchReaderList(chunkMetaData, timeFilter);
+    //        FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
+
+    // for tracing: try to calculate the number of pages
+    if (context.isEnableTracing()) {
+      addTotalPageNumInTracing(context.getQueryId(), pageReaderList.size());
+    }
+
+    if (chunkMetaData.isSeq()) {
+      if (orderUtils.getAscending()) {
+        for (IPageReader iPageReader : pageReaderList) {
+          seqPageReaders.add(
+              new VersionPageReader(
+                  chunkMetaData.getVersion(),
+                  chunkMetaData.getOffsetOfChunkHeader(),
+                  iPageReader,
+                  true));
+        }
+      } else {
+        for (int i = pageReaderList.size() - 1; i >= 0; i--) {
+          seqPageReaders.add(
+              new VersionPageReader(
+                  chunkMetaData.getVersion(),
+                  chunkMetaData.getOffsetOfChunkHeader(),
+                  pageReaderList.get(i),
+                  true));
+        }
+      }
+    } else {
+      pageReaderList.forEach(
+          pageReader ->
+              unSeqPageReaders.add(
+                  new VersionPageReader(
+                      chunkMetaData.getVersion(),
+                      chunkMetaData.getOffsetOfChunkHeader(),
+                      pageReader,
+                      false)));
+    }
+  }
+
+  private void addTotalPageNumInTracing(long queryId, int pageNum) {
+    TracingManager.getInstance().addTotalPageNum(queryId, pageNum);
+  }
+
+  /**
+   * This method should be called after calling hasNextPage.
+   *
+   * <p>hasNextPage may cache firstPageReader if it is not overlapped or cached a BatchData if the
+   * first page is overlapped
+   */
+  boolean isPageOverlapped() throws IOException {
+
+    /*
+     * has an overlapped page
+     */
+    if (hasCachedNextOverlappedPage) {
+      return true;
+    }
+
+    /*
+     * has a non-overlapped page in firstPageReader
+     */
+    if (mergeReader.hasNextTimeValuePair()
+        && ((orderUtils.getAscending()
+                && mergeReader.currentTimeValuePair().getTimestamp()
+                    <= firstPageReader.getStatistics().getEndTime())
+            || (!orderUtils.getAscending()
+                && mergeReader.currentTimeValuePair().getTimestamp()
+                    >= firstPageReader.getStatistics().getStartTime()))) {
+      throw new IOException("overlapped data should be consumed first");
+    }
+
+    Statistics firstPageStatistics = firstPageReader.getStatistics();
+
+    return !unSeqPageReaders.isEmpty()
+        && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics());
+  }
+
+  Statistics currentPageStatistics() {
+    if (firstPageReader == null) {
+      return null;
+    }
+    return firstPageReader.getStatistics();
+  }
+
+  Statistics currentPageStatistics(int index) throws IOException {
+    if (firstPageReader == null) {
+      return null;
+    }
+    if (!(firstPageReader.isAlignedPageReader())) {
+      throw new IOException("Can only get statistics by index from AlignedPageReader");
+    }
+    return firstPageReader.getStatistics(index);
+  }
+
+  Statistics currentPageTimeStatistics() throws IOException {
+    if (firstPageReader == null) {
+      return null;
+    }
+    if (!(firstPageReader.isAlignedPageReader())) {
+      throw new IOException("Can only get statistics of time column from AlignedPageReader");
+    }
+    return firstPageReader.getTimeStatistics();
+  }
+
+  boolean currentPageModified() throws IOException {
+    if (firstPageReader == null) {
+      throw new IOException("no first page");
+    }
+    return firstPageReader.isModified();
+  }
+
+  void skipCurrentPage() {
+    firstPageReader = null;
+  }
+
+  /** This method should only be used when the method isPageOverlapped() return true. */
+  BatchData nextPage() throws IOException {
+
+    if (!hasNextPage() && QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+      throw new IOException("no next page, neither non-overlapped nor overlapped");
+    }
+
+    if (hasCachedNextOverlappedPage) {
+      hasCachedNextOverlappedPage = false;
+      return cachedBatchData;
+    } else {
+
+      /*
+       * next page is not overlapped, push down value filter if it exists
+       */
+      if (valueFilter != null) {
+        firstPageReader.setFilter(valueFilter);
+      }
+      BatchData batchData = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
+      firstPageReader = null;
+
+      return batchData;
+    }
+  }
+
+  /**
+   * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not
+   * contain data, read till next currentLargestEndTime again
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  public boolean hasNextOverlappedPage() throws IOException {
+
+    if (hasCachedNextOverlappedPage) {
+      return true;
+    }
+
+    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+    while (true) {
+
+      // may has overlapped data
+      if (mergeReader.hasNextTimeValuePair()) {
+
+        cachedBatchData =
+            BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true);
+        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+        while (mergeReader.hasNextTimeValuePair()) {
+
+          /*
+           * get current first point in mergeReader, this maybe overlapped later
+           */
+          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+            /*
+             * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+             * 1. has cached batch data, we don't need to read more data, just use the cached data later
+             * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could just use the first page reader later
+             * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+             * we could use the first sequence page reader later
+             */
+            if (cachedBatchData.hasCurrent()
+                || firstPageReader != null
+                || !seqPageReaders.isEmpty()) {
+              break;
+            }
+            // so, we don't have other data except mergeReader
+            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+          }
+
+          // unpack all overlapped data for the first timeValuePair
+          unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+              timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+          unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+          // update if there are unpacked unSeqPageReaders
+          timeValuePair = mergeReader.currentTimeValuePair();
+
+          // from now, the unsequence reader is all unpacked, so we don't need to consider it
+          // we has first page reader now
+          if (firstPageReader != null) {
+            // if current timeValuePair excesses the first page reader's end time, we just use the
+            // cached data
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < firstPageReader.getStatistics().getStartTime())) {
+              cachedBatchData.flip();
+              hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+              return hasCachedNextOverlappedPage;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+              // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+              // and update endTime to the max end time
+              mergeReader.addReader(
+                  firstPageReader
+                      .getAllSatisfiedPageData(orderUtils.getAscending())
+                      .getBatchDataIterator(),
+                  firstPageReader.version,
+                  orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime =
+                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
+              firstPageReader = null;
+            }
+          }
+
+          // the seq page readers is not empty, just like first page reader
+          if (!seqPageReaders.isEmpty()) {
+            if ((orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        > seqPageReaders.get(0).getStatistics().getEndTime())
+                || (!orderUtils.getAscending()
+                    && timeValuePair.getTimestamp()
+                        < seqPageReaders.get(0).getStatistics().getStartTime())) {
+              cachedBatchData.flip();
+              hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+              return hasCachedNextOverlappedPage;
+            } else if (orderUtils.isOverlapped(
+                timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+              VersionPageReader pageReader = seqPageReaders.remove(0);
+              mergeReader.addReader(
+                  pageReader
+                      .getAllSatisfiedPageData(orderUtils.getAscending())
+                      .getBatchDataIterator(),
+                  pageReader.version,
+                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+                  context);
+              currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+            }
+          }
+
+          /*
+           * get the latest first point in mergeReader
+           */
+          timeValuePair = mergeReader.nextTimeValuePair();
+
+          Object valueForFilter = timeValuePair.getValue().getValue();
+
+          // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+          // only accept AlignedPath with only one sub sensor
+          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+            for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+              if (tsPrimitiveType != null) {
+                valueForFilter = tsPrimitiveType.getValue();
+                break;
+              }
+            }
+          }
+
+          if (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            cachedBatchData.putAnObject(
+                timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+          }
+        }
+        cachedBatchData.flip();
+        hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+        /*
+         * if current overlapped page has valid data, return, otherwise read next overlapped page
+         */
+        if (hasCachedNextOverlappedPage) {
+          return true;
+        } else if (mergeReader.hasNextTimeValuePair()) {
+          // condition: seqPage.endTime < mergeReader.currentTime
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) {
+    if (orderUtils.getAscending()) {
+      return Math.min(currentPageEndPointTime, pageReader.getStatistics().getEndTime());
+    } else {
+      return Math.max(currentPageEndPointTime, pageReader.getStatistics().getStartTime());
+    }
+  }
+
+  private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
+
+    /*
+     * no cached page readers
+     */
+    if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
+      return;
+    }
+
+    /*
+     * init firstPageReader
+     */
+    if (firstPageReader == null) {
+      initFirstPageReader();
+    }
+
+    long currentPageEndpointTime;
+    if (mergeReader.hasNextTimeValuePair()) {
+      currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
+    } else {
+      currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+    }
+
+    /*
+     * put all currently directly overlapped unseq page reader to merge reader
+     */
+    unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
+  }
+
+  public void initFirstPageReader() throws IOException {
+    while (this.firstPageReader == null) {
+      VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders();
+
+      // unpack overlapped page using current page reader
+      if (firstPageReader != null) {
+        long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+        unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
+        unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
+        unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+
+        // this page after unpacking must be the first page
+        if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
+          this.firstPageReader = firstPageReader;
+          if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) {
+            seqPageReaders.remove(0);
+            break;
+          } else if (!unSeqPageReaders.isEmpty()
+              && firstPageReader.equals(unSeqPageReaders.peek())) {
+            unSeqPageReaders.poll();
+            break;
+          }
+        }
+      } else {
+        return;
+      }
+    }
+  }
+
+  // We use get() and peek() here in case it's not the first page reader before unpacking
+  private VersionPageReader getFirstPageReaderFromCachedReaders() {
+    VersionPageReader firstPageReader = null;
+    if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
+      if (orderUtils.isTakeSeqAsFirst(
+          seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) {
+        firstPageReader = seqPageReaders.get(0);
+      } else {
+        firstPageReader = unSeqPageReaders.peek();
+      }
+    } else if (!seqPageReaders.isEmpty()) {
+      firstPageReader = seqPageReaders.get(0);
+    } else if (!unSeqPageReaders.isEmpty()) {
+      firstPageReader = unSeqPageReaders.peek();
+    }
+    return firstPageReader;
+  }
+
+  private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
+      throws IOException {
+    while (!unSeqPageReaders.isEmpty()
+        && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) {
+      putPageReaderToMergeReader(unSeqPageReaders.poll());
+    }
+    if (firstPageReader != null
+        && !firstPageReader.isSeq()
+        && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
+      putPageReaderToMergeReader(firstPageReader);
+      firstPageReader = null;
+    }
+  }
+
+  private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
+    mergeReader.addReader(
+        pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(),
+        pageReader.version,
+        orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+        context);
+  }
+
+  public BatchData nextOverlappedPage() throws IOException {
+    if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) {
+      hasCachedNextOverlappedPage = false;
+      return cachedBatchData;
+    }
+    throw new IOException("No more batch data");
+  }
+
+  private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) {
+    return tsFileResources.stream()
+        .sorted(orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource)))
+        .collect(Collectors.toCollection(LinkedList::new));
+  }
+
+  /**
+   * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata
+   *
+   * <p>Because there may be too many files in the scenario used by the user, we cannot open all the
+   * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
+   * approach is likely to be ubiquitous, but it keeps the system running smoothly
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  protected void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOException {
+    /*
+     * Fill sequence TimeSeriesMetadata List until it is not empty
+     */
+    while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
+      unpackSeqTsFileResource();
+    }
+
+    /*
+     * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
+     */
+    while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
+      unpackUnseqTsFileResource();
+    }
+
+    /*
+     * find end time of the first TimeSeriesMetadata
+     */
+    long endTime = -1L;
+    if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+      // only has seq
+      endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
+    } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+      // only has unseq
+      endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
+    } else if (!seqTimeSeriesMetadata.isEmpty()) {
+      // has seq and unseq
+      endTime =
+          orderUtils.getCurrentEndPoint(
+              seqTimeSeriesMetadata.get(0).getStatistics(),
+              unSeqTimeSeriesMetadata.peek().getStatistics());
+    }
+
+    /*
+     * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata
+     */
+    if (endTime != -1) {
+      unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime);
+    }
+
+    /*
+     * update the first TimeSeriesMetadata
+     */
+    if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+      // only has seq
+      firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
+    } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+      // only has unseq
+      firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+    } else if (!seqTimeSeriesMetadata.isEmpty()) {
+      // has seq and unseq
+      if (orderUtils.isTakeSeqAsFirst(
+          seqTimeSeriesMetadata.get(0).getStatistics(),
+          unSeqTimeSeriesMetadata.peek().getStatistics())) {
+        firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
+      } else {
+        firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+      }
+    }
+    if (valueFilter != null
+        && firstTimeSeriesMetadata != null
+        && !isFileOverlapped()
+        && !firstTimeSeriesMetadata.isModified()
+        && !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
+      firstTimeSeriesMetadata = null;
+    }
+  }
+
+  protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
+      throws IOException {
+    while (orderUtils.hasNextUnseqResource()
+        && orderUtils.isOverlapped(endpointTime, orderUtils.getNextUnseqFileResource(false))) {
+      unpackUnseqTsFileResource();
+    }
+    while (orderUtils.hasNextSeqResource()
+        && orderUtils.isOverlapped(endpointTime, orderUtils.getNextSeqFileResource(false))) {
+      unpackSeqTsFileResource();
+    }
+  }
+
+  protected void unpackSeqTsFileResource() throws IOException {
+    ITimeSeriesMetadata timeseriesMetadata =
+        loadTimeSeriesMetadata(
+            orderUtils.getNextSeqFileResource(true),
+            seriesPath,
+            context,
+            getAnyFilter(),
+            allSensors);
+    if (timeseriesMetadata != null) {
+      timeseriesMetadata.setSeq(true);
+      seqTimeSeriesMetadata.add(timeseriesMetadata);
+    }
+  }
+
+  protected void unpackUnseqTsFileResource() throws IOException {
+    ITimeSeriesMetadata timeseriesMetadata =
+        loadTimeSeriesMetadata(
+            orderUtils.getNextUnseqFileResource(true),
+            seriesPath,
+            context,
+            getAnyFilter(),
+            allSensors);
+    if (timeseriesMetadata != null) {
+      timeseriesMetadata.setModified(true);
+      timeseriesMetadata.setSeq(false);
+      unSeqTimeSeriesMetadata.add(timeseriesMetadata);
+    }
+  }
+
+  protected ITimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, seriesPath, context, filter, allSensors);
+  }
+
+  protected Filter getAnyFilter() {
+    return timeFilter != null ? timeFilter : valueFilter;
+  }
+
+  void setTimeFilter(long timestamp) {
+    ((UnaryFilter) timeFilter).setValue(timestamp);
+  }
+
+  Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  private class VersionPageReader {
+
+    protected MergeReaderPriority version;
+    protected IPageReader data;
+
+    protected boolean isSeq;
+
+    VersionPageReader(long version, long offset, IPageReader data, boolean isSeq) {
+      this.version = new MergeReaderPriority(version, offset);
+      this.data = data;
+      this.isSeq = isSeq;
+    }
+
+    public boolean isAlignedPageReader() {
+      return data instanceof IAlignedPageReader;
+    }
+
+    Statistics getStatistics() {
+      return data.getStatistics();
+    }
+
+    Statistics getStatistics(int index) throws IOException {
+      if (!(data instanceof IAlignedPageReader)) {
+        throw new IOException("Can only get statistics by index from AlignedPageReader");
+      }
+      return ((IAlignedPageReader) data).getStatistics(index);
+    }
+
+    Statistics getTimeStatistics() throws IOException {
+      if (!(data instanceof IAlignedPageReader)) {
+        throw new IOException("Can only get statistics of time column from AlignedPageReader");
+      }
+      return ((IAlignedPageReader) data).getTimeStatistics();
+    }
+
+    BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+      return data.getAllSatisfiedPageData(ascending);
+    }
+
+    void setFilter(Filter filter) {
+      data.setFilter(filter);
+    }
+
+    boolean isModified() {
+      return data.isModified();
+    }
+
+    public boolean isSeq() {
+      return isSeq;
+    }
+  }
+
+  public interface TimeOrderUtils {
+
+    long getOrderTime(Statistics<? extends Object> statistics);
+
+    long getOrderTime(TsFileResource fileResource);
+
+    long getOverlapCheckTime(Statistics<? extends Object> range);
+
+    boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right);
+
+    boolean isOverlapped(long time, Statistics<? extends Object> right);
+
+    boolean isOverlapped(long time, TsFileResource right);
+
+    <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
+
+    long getCurrentEndPoint(long time, Statistics<? extends Object> statistics);
+
+    long getCurrentEndPoint(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
+
+    boolean isExcessEndpoint(long time, long endpointTime);
+
+    /** Return true if taking first page reader from seq readers */
+    boolean isTakeSeqAsFirst(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
+
+    boolean getAscending();
+
+    boolean hasNextSeqResource();
+
+    boolean hasNextUnseqResource();
+
+    TsFileResource getNextSeqFileResource(boolean isDelete);
+
+    TsFileResource getNextUnseqFileResource(boolean isDelete);
+  }
+
+  class DescTimeOrderUtils implements TimeOrderUtils {
+
+    @Override
+    public long getOrderTime(Statistics statistics) {
+      return statistics.getEndTime();
+    }
+
+    @Override
+    public long getOrderTime(TsFileResource fileResource) {
+      return fileResource.getEndTime(seriesPath.getDevice());
+    }
+
+    @Override
+    public long getOverlapCheckTime(Statistics range) {
+      return range.getStartTime();
+    }
+
+    @Override
+    public boolean isOverlapped(Statistics left, Statistics right) {
+      return left.getStartTime() <= right.getEndTime();
+    }
+
+    @Override
+    public boolean isOverlapped(long time, Statistics right) {
+      return time <= right.getEndTime();
+    }
+
+    @Override
+    public boolean isOverlapped(long time, TsFileResource right) {
+      return time <= right.getEndTime(seriesPath.getDevice());
+    }
+
+    @Override
+    public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
+      Objects.requireNonNull(keyExtractor);
+      return (Comparator<T> & Serializable)
+          (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1));
+    }
+
+    @Override
+    public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+      return Math.max(time, statistics.getStartTime());
+    }
+
+    @Override
+    public long getCurrentEndPoint(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+      return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime());
+    }
+
+    @Override
+    public boolean isExcessEndpoint(long time, long endpointTime) {
+      return time < endpointTime;
+    }
+
+    @Override
+    public boolean isTakeSeqAsFirst(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+      return seqStatistics.getEndTime() > unseqStatistics.getEndTime();
+    }
+
+    @Override
+    public boolean getAscending() {
+      return false;
+    }
+
+    @Override
+    public boolean hasNextSeqResource() {
+      while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+        TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+        if (tsFileResource != null
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+          break;
+        }
+        curSeqFileIndex--;
+      }
+      return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+    }
+
+    @Override
+    public boolean hasNextUnseqResource() {
+      while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+        TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+        if (tsFileResource != null
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+          break;
+        }
+        curUnseqFileIndex++;
+      }
+      return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+    }
+
+    @Override
+    public TsFileResource getNextSeqFileResource(boolean isDelete) {
+      TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+      if (isDelete) {
+        curSeqFileIndex--;
+        if (context.isEnableTracing()) {
+          TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+        }
+      }
+      return tsFileResource;
+    }
+
+    @Override
+    public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+      TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+      if (isDelete) {
+        curUnseqFileIndex++;
+        if (context.isEnableTracing()) {
+          TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+        }
+      }
+      return tsFileResource;
+    }
+  }
+
+  class AscTimeOrderUtils implements TimeOrderUtils {
+
+    @Override
+    public long getOrderTime(Statistics statistics) {
+      return statistics.getStartTime();
+    }
+
+    @Override
+    public long getOrderTime(TsFileResource fileResource) {
+      return fileResource.getStartTime(seriesPath.getDevice());
+    }
+
+    @Override
+    public long getOverlapCheckTime(Statistics range) {
+      return range.getEndTime();
+    }
+
+    @Override
+    public boolean isOverlapped(Statistics left, Statistics right) {
+      return left.getEndTime() >= right.getStartTime();
+    }
+
+    @Override
+    public boolean isOverlapped(long time, Statistics right) {
+      return time >= right.getStartTime();
+    }
+
+    @Override
+    public boolean isOverlapped(long time, TsFileResource right) {
+      return time >= right.getStartTime(seriesPath.getDevice());
+    }
+
+    @Override
+    public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
+      Objects.requireNonNull(keyExtractor);
+      return (Comparator<T> & Serializable)
+          (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c1), keyExtractor.applyAsLong(c2));
+    }
+
+    @Override
+    public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+      return Math.min(time, statistics.getEndTime());
+    }
+
+    @Override
+    public long getCurrentEndPoint(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+      return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime());
+    }
+
+    @Override
+    public boolean isExcessEndpoint(long time, long endpointTime) {
+      return time > endpointTime;
+    }
+
+    @Override
+    public boolean isTakeSeqAsFirst(
+        Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+      return seqStatistics.getStartTime() < unseqStatistics.getStartTime();
+    }
+
+    @Override
+    public boolean getAscending() {
+      return true;
+    }
+
+    @Override
+    public boolean hasNextSeqResource() {
+      while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+        TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+        if (tsFileResource != null
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+          break;
+        }
+        curSeqFileIndex++;
+      }
+      return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+    }
+
+    @Override
+    public boolean hasNextUnseqResource() {
+      while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+        TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+        if (tsFileResource != null
+            && tsFileResource.isSatisfied(
+                seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+          break;
+        }
+        curUnseqFileIndex++;
+      }
+      return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+    }
+
+    @Override
+    public TsFileResource getNextSeqFileResource(boolean isDelete) {
+      TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+      if (isDelete) {
+        curSeqFileIndex++;
+        if (context.isEnableTracing()) {
+          TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+        }
+      }
+      return tsFileResource;
+    }
+
+    @Override
+    public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+      TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+      if (isDelete) {
+        curUnseqFileIndex++;
+        if (context.isEnableTracing()) {
+          TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+        }
+      }
+      return tsFileResource;
+    }
+  }
+
+  public TimeOrderUtils getOrderUtils() {
+    return orderUtils;
+  }
+
+  @TestOnly
+  public Filter getValueFilter() {
+    return valueFilter;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java
new file mode 100644
index 0000000000..44cd258cb7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class SeriesRawDataPrefetchReader implements ManagedSeriesReader {
+
+  private final SeriesBitmapReader seriesReader;
+
+  private boolean hasRemaining;
+  private boolean managedByQueryManager;
+
+  private BatchData batchData;
+  private boolean hasCachedBatchData = false;
+
+  public SeriesRawDataPrefetchReader(SeriesBitmapReader seriesReader) {
+    this.seriesReader = seriesReader;
+  }
+
+  public SeriesRawDataPrefetchReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    this.seriesReader =
+        seriesPath.createSeriesBitmapReader(
+            allSensors,
+            dataType,
+            context,
+            dataSource,
+            timeFilter,
+            valueFilter,
+            fileFilter,
+            ascending);
+  }
+
+  /**
+   * This method overrides the AbstractDataReader.hasNextOverlappedPage for pause reads, to achieve
+   * a continuous read
+   */
+  @Override
+  public boolean hasNextBatch() throws IOException {
+
+    if (hasCachedBatchData) {
+      return true;
+    }
+
+    /*
+     * consume page data firstly
+     */
+    if (readPageData()) {
+      hasCachedBatchData = true;
+      return true;
+    }
+
+    /*
+     * consume chunk data secondly
+     */
+    if (readChunkData()) {
+      hasCachedBatchData = true;
+      return true;
+    }
+
+    /*
+     * consume next file finally
+     */
+    while (seriesReader.hasNextFile()) {
+      if (readChunkData()) {
+        hasCachedBatchData = true;
+        return true;
+      }
+    }
+    return hasCachedBatchData;
+  }
+
+  @Override
+  public BatchData nextBatch() throws IOException {
+    if (hasCachedBatchData || hasNextBatch()) {
+      hasCachedBatchData = false;
+      return batchData;
+    }
+    throw new IOException("no next batch");
+  }
+
+  @Override
+  public void close() throws IOException {
+    // no resources need to close
+  }
+
+  @Override
+  public boolean isManagedByQueryManager() {
+    return managedByQueryManager;
+  }
+
+  @Override
+  public void setManagedByQueryManager(boolean managedByQueryManager) {
+    this.managedByQueryManager = managedByQueryManager;
+  }
+
+  @Override
+  public boolean hasRemaining() {
+    return hasRemaining;
+  }
+
+  @Override
+  public void setHasRemaining(boolean hasRemaining) {
+    this.hasRemaining = hasRemaining;
+  }
+
+  private boolean readChunkData() throws IOException {
+    while (seriesReader.hasNextChunk()) {
+      if (readPageData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData() throws IOException {
+    while (seriesReader.hasNextPage()) {
+      batchData = seriesReader.nextPage();
+      if (!isEmpty(batchData)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isEmpty(BatchData batchData) {
+    return batchData == null || !batchData.hasCurrent();
+  }
+
+  @TestOnly
+  public SeriesBitmapReader getSeriesReader() {
+    return seriesReader;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 6273add29f..a983e6db5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader;
 import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader;
 import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
 import org.apache.iotdb.db.query.reader.chunk.metadata.MemAlignedChunkMetadataLoader;
@@ -263,4 +264,20 @@ public class FileLoaderUtils {
     IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
     return chunkReader.loadPageReaderList();
   }
+
+  public static List<IPageReader> loadPagePrefetchReaderList(
+      IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
+    if (chunkMetaData == null) {
+      throw new IOException("Can't init null chunkMeta");
+    }
+    IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
+    IChunkReader chunkReader;
+    if (chunkLoader instanceof DiskAlignedChunkLoader) {
+      chunkReader =
+          ((DiskAlignedChunkLoader) chunkLoader).getChunkPrefetchReader(chunkMetaData, timeFilter);
+    } else {
+      chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
+    }
+    return chunkReader.loadPageReaderList();
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
index 3dcec665fe..c83a41795a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.AlignedPagePrefetchReader;
 import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader;
 
 import java.io.IOException;
@@ -87,6 +88,25 @@ public class AlignedChunkReader implements IChunkReader {
     initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
   }
 
+  public AlignedChunkReader(
+      Chunk timeChunk, List<Chunk> valueChunkList, Filter filter, boolean prefetch)
+      throws IOException {
+    this.filter = filter;
+    this.timeChunkDataBuffer = timeChunk.getData();
+    this.valueDeleteIntervalList = new ArrayList<>();
+    this.timeChunkHeader = timeChunk.getHeader();
+    this.unCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType());
+    List<Statistics> valueChunkStatisticsList = new ArrayList<>();
+    valueChunkList.forEach(
+        chunk -> {
+          valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader());
+          valueChunkDataBufferList.add(chunk == null ? null : chunk.getData());
+          valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic());
+          valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList());
+        });
+    initAllPrefetchPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
+  }
+
   /** construct all the page readers in this chunk */
   private void initAllPageReaders(
       Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList)
@@ -138,6 +158,56 @@ public class AlignedChunkReader implements IChunkReader {
     }
   }
 
+  private void initAllPrefetchPageReaders(
+      Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList)
+      throws IOException {
+    // construct next satisfied page header
+    while (timeChunkDataBuffer.remaining() > 0) {
+      // deserialize a PageHeader from chunkDataBuffer
+      PageHeader timePageHeader;
+      List<PageHeader> valuePageHeaderList = new ArrayList<>();
+
+      boolean exits = false;
+      // this chunk has only one page
+      if ((timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+        timePageHeader = PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics);
+        for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
+          if (valueChunkDataBufferList.get(i) != null) {
+            exits = true;
+            valuePageHeaderList.add(
+                PageHeader.deserializeFrom(
+                    valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i)));
+          } else {
+            valuePageHeaderList.add(null);
+          }
+        }
+      } else { // this chunk has more than one page
+        timePageHeader =
+            PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType());
+        for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
+          if (valueChunkDataBufferList.get(i) != null) {
+            exits = true;
+            valuePageHeaderList.add(
+                PageHeader.deserializeFrom(
+                    valueChunkDataBufferList.get(i), valueChunkHeaderList.get(i).getDataType()));
+          } else {
+            valuePageHeaderList.add(null);
+          }
+        }
+      }
+      // if the current page satisfies
+      if (exits && timePageSatisfied(timePageHeader)) {
+        AlignedPagePrefetchReader alignedPageReader =
+            constructPrefetchPageReaderForNextPage(timePageHeader, valuePageHeaderList);
+        if (alignedPageReader != null) {
+          pageReaderList.add(alignedPageReader);
+        }
+      } else {
+        skipBytesInStreamByLength(timePageHeader, valuePageHeaderList);
+      }
+    }
+  }
+
   /** used for time page filter */
   private boolean timePageSatisfied(PageHeader timePageHeader) {
     long startTime = timePageHeader.getStatistics().getStartTime();
@@ -219,6 +289,65 @@ public class AlignedChunkReader implements IChunkReader {
     return alignedPageReader;
   }
 
+  private AlignedPagePrefetchReader constructPrefetchPageReaderForNextPage(
+      PageHeader timePageHeader, List<PageHeader> valuePageHeader) throws IOException {
+    PageInfo timePageInfo = new PageInfo();
+    getPageInfo(timePageHeader, timeChunkDataBuffer, timeChunkHeader, timePageInfo);
+    PageInfo valuePageInfo = new PageInfo();
+    List<PageHeader> valuePageHeaderList = new ArrayList<>();
+    List<ByteBuffer> valuePageDataList = new ArrayList<>();
+    List<TSDataType> valueDataTypeList = new ArrayList<>();
+    List<Decoder> valueDecoderList = new ArrayList<>();
+    boolean exist = false;
+    for (int i = 0; i < valuePageHeader.size(); i++) {
+      if (valuePageHeader.get(i) == null
+          || valuePageHeader.get(i).getUncompressedSize() == 0) { // Empty Page
+        valuePageHeaderList.add(null);
+        valuePageDataList.add(null);
+        valueDataTypeList.add(null);
+        valueDecoderList.add(null);
+      } else if (pageSatisfied(
+          valuePageHeader.get(i),
+          valueDeleteIntervalList.get(i))) { // if the page is satisfied, deserialize it
+        getPageInfo(
+            valuePageHeader.get(i),
+            valueChunkDataBufferList.get(i),
+            valueChunkHeaderList.get(i),
+            valuePageInfo);
+        valuePageHeaderList.add(valuePageInfo.pageHeader);
+        valuePageDataList.add(valuePageInfo.pageData);
+        valueDataTypeList.add(valuePageInfo.dataType);
+        valueDecoderList.add(valuePageInfo.decoder);
+        exist = true;
+      } else { // if the page is not satisfied, just skip it
+        valueChunkDataBufferList
+            .get(i)
+            .position(
+                valueChunkDataBufferList.get(i).position()
+                    + valuePageHeader.get(i).getCompressedSize());
+        valuePageHeaderList.add(null);
+        valuePageDataList.add(null);
+        valueDataTypeList.add(null);
+        valueDecoderList.add(null);
+      }
+    }
+    if (!exist) {
+      return null;
+    }
+    AlignedPagePrefetchReader alignedPageReader =
+        new AlignedPagePrefetchReader(
+            timePageHeader,
+            timePageInfo.pageData,
+            timeDecoder,
+            valuePageHeaderList,
+            valuePageDataList,
+            valueDataTypeList,
+            valueDecoderList,
+            filter);
+    alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList);
+    return alignedPageReader;
+  }
+
   /**
    * deserialize the page
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java
new file mode 100644
index 0000000000..8fd7207052
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.reader.page;
+
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+// we use prefetchReader to only read time + bitmap without deserialize values
+// one can also choose reading bitmap only without time and values
+
+public class AlignedPagePrefetchReader implements IPageReader, IAlignedPageReader {
+
+  private final TimePageReader timePageReader;
+  private final List<ValuePageBitmapReader> valuePageReaderList;
+  private final int valueCount;
+  private Filter filter;
+  private boolean isModified;
+
+  public AlignedPagePrefetchReader(
+      PageHeader timePageHeader,
+      ByteBuffer timePageData,
+      Decoder timeDecoder,
+      List<PageHeader> valuePageHeaderList,
+      List<ByteBuffer> valuePageDataList,
+      List<TSDataType> valueDataTypeList,
+      List<Decoder> valueDecoderList,
+      Filter filter) {
+    timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder);
+    isModified = timePageReader.isModified();
+    valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
+    for (int i = 0; i < valuePageHeaderList.size(); i++) {
+      if (valuePageHeaderList.get(i) != null) {
+        ValuePageBitmapReader valuePageReader =
+            new ValuePageBitmapReader(
+                valuePageHeaderList.get(i),
+                valuePageDataList.get(i),
+                valueDataTypeList.get(i),
+                valueDecoderList.get(i));
+        valuePageReaderList.add(valuePageReader);
+        isModified = isModified || valuePageReader.isModified();
+      } else {
+        valuePageReaderList.add(null);
+      }
+    }
+    this.filter = filter;
+    this.valueCount = valuePageReaderList.size();
+  }
+
+  @Override
+  public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+    // this is only for implementing the interface
+    BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
+    int timeIndex = -1;
+    while (timePageReader.hasNextTime()) {
+      long timestamp = timePageReader.nextTime();
+      timeIndex++;
+      // if all the sub sensors' value are null in current row, just discard it
+      boolean isNull = true;
+      Object notNullObject = null;
+      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
+      for (int i = 0; i < v.length; i++) {
+        ValuePageBitmapReader pageReader = valuePageReaderList.get(i);
+        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
+        if (v[i] != null) {
+          isNull = false;
+          notNullObject = v[i].getValue();
+        }
+      }
+      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
+      // sensor
+      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
+        pageData.putVector(timestamp, v);
+      }
+    }
+    return pageData.flip();
+  }
+
+  public void setDeleteIntervalList(List<List<TimeRange>> list) {
+    for (int i = 0; i < valueCount; i++) {
+      if (valuePageReaderList.get(i) != null) {
+        valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
+      }
+    }
+  }
+
+  @Override
+  public Statistics getStatistics() {
+    return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
+        ? valuePageReaderList.get(0).getStatistics()
+        : timePageReader.getStatistics();
+  }
+
+  @Override
+  public Statistics getStatistics(int index) {
+    ValuePageBitmapReader valuePageReader = valuePageReaderList.get(index);
+    return valuePageReader == null ? null : valuePageReader.getStatistics();
+  }
+
+  @Override
+  public Statistics getTimeStatistics() {
+    return timePageReader.getStatistics();
+  }
+
+  @Override
+  public void setFilter(Filter filter) {
+    if (this.filter == null) {
+      this.filter = filter;
+    } else {
+      this.filter = new AndFilter(this.filter, filter);
+    }
+  }
+
+  @Override
+  public boolean isModified() {
+    return isModified;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java
new file mode 100644
index 0000000000..5869b288af
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.reader.page;
+
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class ValuePageBitmapReader {
+
+  private static final int MASK = 0x80;
+
+  private final PageHeader pageHeader;
+
+  private final TSDataType dataType;
+
+  /** decoder for value column */
+  private final Decoder valueDecoder;
+
+  private byte[] bitmap;
+
+  private int size;
+
+  /** value column in memory */
+  protected ByteBuffer valueBuffer;
+
+  /** A list of deleted intervals. */
+  private List<TimeRange> deleteIntervalList;
+
+  private int deleteCursor = 0;
+
+  public ValuePageBitmapReader(
+      PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder) {
+    this.dataType = dataType;
+    this.valueDecoder = valueDecoder;
+    this.pageHeader = pageHeader;
+    if (pageData != null) {
+      splitDataToBitmapAndValue(pageData);
+    }
+    this.valueBuffer = pageData;
+  }
+
+  private void splitDataToBitmapAndValue(ByteBuffer pageData) {
+    if (!pageData.hasRemaining()) { // Empty Page
+      return;
+    }
+    this.size = ReadWriteIOUtils.readInt(pageData);
+    this.bitmap = new byte[(size + 7) / 8];
+    pageData.get(bitmap);
+    this.valueBuffer = pageData.slice();
+  }
+
+  /**
+   * return a BatchData with the corresponding timeBatch, the BatchData's dataType is same as this
+   * sub sensor
+   */
+  public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter filter) {
+    BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false);
+    for (int i = 0; i < timeBatch.length; i++) {
+      if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+        continue;
+      }
+      long timestamp = timeBatch[i];
+      switch (dataType) {
+        case BOOLEAN:
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
+            pageData.putBoolean(timestamp, aBoolean);
+          }
+          break;
+        case INT32:
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
+            pageData.putInt(timestamp, anInt);
+          }
+          break;
+        case INT64:
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
+            pageData.putLong(timestamp, aLong);
+          }
+          break;
+        case FLOAT:
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
+            pageData.putFloat(timestamp, aFloat);
+          }
+          break;
+        case DOUBLE:
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
+            pageData.putDouble(timestamp, aDouble);
+          }
+          break;
+        case TEXT:
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
+            pageData.putBinary(timestamp, aBinary);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
+      }
+    }
+    return pageData.flip();
+  }
+
+  public TsPrimitiveType nextValue(long timestamp, int timeIndex) {
+    if (valueBuffer == null || ((bitmap[timeIndex / 8] & 0xFF) & (MASK >>> (timeIndex % 8))) == 0) {
+      return null;
+    } else {
+      if (!isDeleted(timestamp)) {
+        return new TsPrimitiveType.TsDouble(1.0);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * return the value array of the corresponding time, if this sub sensor don't have a value in a
+   * time, just fill it with null
+   */
+  public TsPrimitiveType[] nextValueBatch(long[] timeBatch) {
+    TsPrimitiveType[] valueBatch = new TsPrimitiveType[size];
+    if (valueBuffer == null) {
+      return valueBatch;
+    }
+    for (int i = 0; i < size; i++) {
+      if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+        continue;
+      }
+      switch (dataType) {
+        case BOOLEAN:
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsBoolean(aBoolean);
+          }
+          break;
+        case INT32:
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsInt(anInt);
+          }
+          break;
+        case INT64:
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsLong(aLong);
+          }
+          break;
+        case FLOAT:
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsFloat(aFloat);
+          }
+          break;
+        case DOUBLE:
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsDouble(aDouble);
+          }
+          break;
+        case TEXT:
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (!isDeleted(timeBatch[i])) {
+            valueBatch[i] = new TsPrimitiveType.TsBinary(aBinary);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
+      }
+    }
+    return valueBatch;
+  }
+
+  public Statistics getStatistics() {
+    return pageHeader.getStatistics();
+  }
+
+  public void setDeleteIntervalList(List<TimeRange> list) {
+    this.deleteIntervalList = list;
+  }
+
+  public List<TimeRange> getDeleteIntervalList() {
+    return deleteIntervalList;
+  }
+
+  public boolean isModified() {
+    return pageHeader.isModified();
+  }
+
+  protected boolean isDeleted(long timestamp) {
+    while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
+      if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
+        return true;
+      } else if (deleteIntervalList.get(deleteCursor).getMax() < timestamp) {
+        deleteCursor++;
+      } else {
+        return false;
+      }
+    }
+    return false;
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+}