You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/16 01:58:42 UTC

[iotdb] branch master updated: [IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 70772cb868 [IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)
70772cb868 is described below

commit 70772cb8684315814959974811440f746e5c19c3
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Sat Jul 16 09:58:37 2022 +0800

    [IOTDB-3778] Use SeriesScanUtil in compaction for point reader (#6668)
---
 .../rewrite/task/ReadPointPerformerSubTask.java    |   19 +-
 .../impl/ReadPointCompactionPerformer.java         |   56 +-
 .../engine/compaction/reader/IDataBlockReader.java |   31 +
 .../compaction/reader/SeriesDataBlockReader.java   |  156 ++
 .../fragment/FragmentInstanceContext.java          |   11 +
 .../execution/operator/source/SeriesScanUtil.java  |   17 +-
 .../iotdb/db/query/context/QueryContext.java       |    2 +-
 .../ReadPointCompactionPerformerTest.java          | 1689 ++++++++++----------
 8 files changed, 1104 insertions(+), 877 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index d22362746c..4280d2a6f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -20,10 +20,10 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -46,7 +46,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private final String device;
   private final Set<String> measurementList;
-  private final QueryContext queryContext;
+  private final FragmentInstanceContext fragmentInstanceContext;
   private final QueryDataSource queryDataSource;
   private final AbstractCompactionWriter compactionWriter;
   private final Map<String, MeasurementSchema> schemaMap;
@@ -55,14 +55,14 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
   public ReadPointPerformerSubTask(
       String device,
       Set<String> measurementList,
-      QueryContext queryContext,
+      FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
       Map<String, MeasurementSchema> schemaMap,
       int taskId) {
     this.device = device;
     this.measurementList = measurementList;
-    this.queryContext = queryContext;
+    this.fragmentInstanceContext = fragmentInstanceContext;
     this.queryDataSource = queryDataSource;
     this.compactionWriter = compactionWriter;
     this.schemaMap = schemaMap;
@@ -74,19 +74,20 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
     for (String measurement : measurementList) {
       List<IMeasurementSchema> measurementSchemas =
           Collections.singletonList(schemaMap.get(measurement));
-      IBatchReader dataBatchReader =
+      IDataBlockReader dataBlockReader =
           ReadPointCompactionPerformer.constructReader(
               device,
               Collections.singletonList(measurement),
               measurementSchemas,
               measurementList,
-              queryContext,
+              fragmentInstanceContext,
               queryDataSource,
               false);
 
-      if (dataBatchReader.hasNextBatch()) {
+      if (dataBlockReader.hasNextBatch()) {
         compactionWriter.startMeasurement(measurementSchemas, taskId);
-        ReadPointCompactionPerformer.writeWithReader(compactionWriter, dataBatchReader, taskId);
+        ReadPointCompactionPerformer.writeWithReader(
+            compactionWriter, dataBlockReader, taskId, false);
         compactionWriter.endMeasurement(taskId);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index eecb159196..a47574a7b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerform
 import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.performer.IUnseqCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
 import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
@@ -39,19 +41,19 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -106,7 +108,8 @@ public class ReadPointCompactionPerformer
   public void perform()
       throws IOException, MetadataException, StorageEngineException, InterruptedException {
     long queryId = QueryResourceManager.getInstance().assignCompactionQueryId();
-    QueryContext queryContext = new QueryContext(queryId);
+    FragmentInstanceContext fragmentInstanceContext =
+        FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
     QueryDataSource queryDataSource = new QueryDataSource(seqFiles, unseqFiles);
     QueryResourceManager.getInstance()
         .getQueryFileManager()
@@ -126,10 +129,10 @@ public class ReadPointCompactionPerformer
 
         if (isAligned) {
           compactAlignedSeries(
-              device, deviceIterator, compactionWriter, queryContext, queryDataSource);
+              device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource);
         } else {
           compactNonAlignedSeries(
-              device, deviceIterator, compactionWriter, queryContext, queryDataSource);
+              device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource);
         }
       }
 
@@ -156,7 +159,7 @@ public class ReadPointCompactionPerformer
       String device,
       MultiTsFileDeviceIterator deviceIterator,
       AbstractCompactionWriter compactionWriter,
-      QueryContext queryContext,
+      FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
       throws IOException, MetadataException {
     MultiTsFileDeviceIterator.AlignedMeasurementIterator alignedMeasurementIterator =
@@ -171,21 +174,21 @@ public class ReadPointCompactionPerformer
         measurementSchemas.stream()
             .map(IMeasurementSchema::getMeasurementId)
             .collect(Collectors.toList());
-    IBatchReader dataBatchReader =
+    IDataBlockReader dataBlockReader =
         constructReader(
             device,
             existedMeasurements,
             measurementSchemas,
             allMeasurements,
-            queryContext,
+            fragmentInstanceContext,
             queryDataSource,
             true);
 
-    if (dataBatchReader.hasNextBatch()) {
+    if (dataBlockReader.hasNextBatch()) {
       // chunkgroup is serialized only when at least one timeseries under this device has data
       compactionWriter.startChunkGroup(device, true);
       compactionWriter.startMeasurement(measurementSchemas, 0);
-      writeWithReader(compactionWriter, dataBatchReader, 0);
+      writeWithReader(compactionWriter, dataBlockReader, 0, true);
       compactionWriter.endMeasurement(0);
       compactionWriter.endChunkGroup();
     }
@@ -195,7 +198,7 @@ public class ReadPointCompactionPerformer
       String device,
       MultiTsFileDeviceIterator deviceIterator,
       AbstractCompactionWriter compactionWriter,
-      QueryContext queryContext,
+      FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
       throws IOException, InterruptedException, IllegalPathException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
@@ -224,7 +227,7 @@ public class ReadPointCompactionPerformer
                   new ReadPointPerformerSubTask(
                       device,
                       measurementsForEachSubTask[i],
-                      queryContext,
+                      fragmentInstanceContext,
                       queryDataSource,
                       compactionWriter,
                       schemaMap,
@@ -341,12 +344,12 @@ public class ReadPointCompactionPerformer
    * @param measurementIds if device is aligned, then measurementIds contain all measurements. If
    *     device is not aligned, then measurementIds only contain one measurement.
    */
-  public static IBatchReader constructReader(
+  public static IDataBlockReader constructReader(
       String deviceId,
       List<String> measurementIds,
       List<IMeasurementSchema> measurementSchemas,
       Set<String> allSensors,
-      QueryContext queryContext,
+      FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource,
       boolean isAlign)
       throws IllegalPathException {
@@ -359,17 +362,24 @@ public class ReadPointCompactionPerformer
       seriesPath = new MeasurementPath(deviceId, measurementIds.get(0), measurementSchemas.get(0));
       tsDataType = measurementSchemas.get(0).getType();
     }
-    return new SeriesRawDataBatchReader(
-        seriesPath, allSensors, tsDataType, queryContext, queryDataSource, null, null, null, true);
+    return new SeriesDataBlockReader(
+        seriesPath, allSensors, tsDataType, fragmentInstanceContext, queryDataSource, true);
   }
 
   public static void writeWithReader(
-      AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException {
+      AbstractCompactionWriter writer, IDataBlockReader reader, int subTaskId, boolean isAligned)
+      throws IOException {
     while (reader.hasNextBatch()) {
-      BatchData batchData = reader.nextBatch();
-      while (batchData.hasCurrent()) {
-        writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId);
-        batchData.next();
+      TsBlock tsBlock = reader.nextBatch();
+      IPointReader pointReader;
+      if (isAligned) {
+        pointReader = tsBlock.getTsBlockAlignedRowIterator();
+      } else {
+        pointReader = tsBlock.getTsBlockSingleColumnIterator();
+      }
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+        writer.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue(), subTaskId);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java
new file mode 100644
index 0000000000..6f5bd080f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/IDataBlockReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.io.IOException;
+
+public interface IDataBlockReader {
+  boolean hasNextBatch() throws IOException;
+
+  TsBlock nextBatch() throws IOException;
+
+  void close() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java
new file mode 100644
index 0000000000..ef42dbe7bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/SeriesDataBlockReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanUtil;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SeriesDataBlockReader implements IDataBlockReader {
+
+  private final SeriesScanUtil seriesScanUtil;
+
+  private TsBlock tsBlock;
+
+  private boolean hasCachedBatchData = false;
+
+  public SeriesDataBlockReader(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      FragmentInstanceContext context,
+      QueryDataSource dataSource,
+      boolean ascending) {
+    if (seriesPath instanceof AlignedPath) {
+      this.seriesScanUtil =
+          new AlignedSeriesScanUtil(seriesPath, allSensors, context, null, null, ascending);
+    } else if (seriesPath instanceof MeasurementPath) {
+      this.seriesScanUtil =
+          new SeriesScanUtil(seriesPath, allSensors, dataType, context, null, null, ascending);
+    } else {
+      throw new IllegalArgumentException("Should call exact sub class!");
+    }
+    this.seriesScanUtil.initQueryDataSource(dataSource);
+  }
+
+  @TestOnly
+  public SeriesDataBlockReader(
+      PartialPath seriesPath,
+      TSDataType dataType,
+      FragmentInstanceContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      boolean ascending) {
+    Set<String> allSensors = new HashSet<>();
+    if (seriesPath instanceof AlignedPath) {
+      this.seriesScanUtil =
+          new AlignedSeriesScanUtil(seriesPath, allSensors, context, null, null, ascending);
+    } else {
+      allSensors.add(seriesPath.getMeasurement());
+      this.seriesScanUtil =
+          new SeriesScanUtil(seriesPath, allSensors, dataType, context, null, null, ascending);
+    }
+    seriesScanUtil.initQueryDataSource(seqFileResource, unseqFileResource);
+  }
+
+  @Override
+  public boolean hasNextBatch() throws IOException {
+
+    if (hasCachedBatchData) {
+      return true;
+    }
+
+    /*
+     * consume page data firstly
+     */
+    if (readPageData()) {
+      hasCachedBatchData = true;
+      return true;
+    }
+
+    /*
+     * consume chunk data secondly
+     */
+    if (readChunkData()) {
+      hasCachedBatchData = true;
+      return true;
+    }
+
+    /*
+     * consume next file finally
+     */
+    while (seriesScanUtil.hasNextFile()) {
+      if (readChunkData()) {
+        hasCachedBatchData = true;
+        return true;
+      }
+    }
+    return hasCachedBatchData;
+  }
+
+  @Override
+  public TsBlock nextBatch() throws IOException {
+    if (hasCachedBatchData || hasNextBatch()) {
+      hasCachedBatchData = false;
+      return tsBlock;
+    }
+    throw new IOException("no next block");
+  }
+
+  @Override
+  public void close() throws IOException {
+    // no resources need to close
+  }
+
+  private boolean readChunkData() throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      if (readPageData()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData() throws IOException {
+    while (seriesScanUtil.hasNextPage()) {
+      tsBlock = seriesScanUtil.nextPage();
+      if (!isEmpty(tsBlock)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isEmpty(TsBlock tsBlock) {
+    return tsBlock == null || tsBlock.isEmpty();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index ffc9d101b3..088896c34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -72,6 +72,10 @@ public class FragmentInstanceContext extends QueryContext {
     return instanceContext;
   }
 
+  public static FragmentInstanceContext createFragmentInstanceContextForCompaction(long queryId) {
+    return new FragmentInstanceContext(queryId);
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
     this.id = id;
@@ -79,6 +83,13 @@ public class FragmentInstanceContext extends QueryContext {
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
   }
 
+  // used for compaction
+  private FragmentInstanceContext(long queryId) {
+    this.queryId = queryId;
+    this.id = null;
+    this.stateMachine = null;
+  }
+
   public void start() {
     long now = System.currentTimeMillis();
     executionStartTime.compareAndSet(null, now);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 483ef03afa..254693e7b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -163,6 +164,14 @@ public class SeriesScanUtil {
     orderUtils.setCurSeqFileIndex(dataSource);
   }
 
+  @TestOnly
+  public void initQueryDataSource(
+      List<TsFileResource> seqFileResource, List<TsFileResource> unseqFileResource) {
+    dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
+    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    orderUtils.setCurSeqFileIndex(dataSource);
+  }
+
   protected PriorityMergeReader getPriorityMergeReader() {
     return new PriorityMergeReader();
   }
@@ -175,7 +184,7 @@ public class SeriesScanUtil {
     return !(hasNextPage() || hasNextChunk() || hasNextFile());
   }
 
-  boolean hasNextFile() throws IOException {
+  public boolean hasNextFile() throws IOException {
 
     if (!unSeqPageReaders.isEmpty()
         || firstPageReader != null
@@ -250,7 +259,7 @@ public class SeriesScanUtil {
    * This method should be called after hasNextFile() until no next chunk, make sure that all
    * overlapped chunks are consumed
    */
-  boolean hasNextChunk() throws IOException {
+  public boolean hasNextChunk() throws IOException {
 
     if (!unSeqPageReaders.isEmpty()
         || firstPageReader != null
@@ -383,7 +392,7 @@ public class SeriesScanUtil {
    */
   @SuppressWarnings("squid:S3776")
   // Suppress high Cognitive Complexity warning
-  boolean hasNextPage() throws IOException {
+  public boolean hasNextPage() throws IOException {
 
     /*
      * has overlapped data before
@@ -601,7 +610,7 @@ public class SeriesScanUtil {
   }
 
   /** This method should only be used when the method isPageOverlapped() return true. */
-  TsBlock nextPage() throws IOException {
+  public TsBlock nextPage() throws IOException {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index f8de63034c..5ce05c1816 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -48,7 +48,7 @@ public class QueryContext {
    */
   private final Map<String, List<Modification>> fileModCache = new HashMap<>();
 
-  private long queryId;
+  protected long queryId;
 
   private long queryTimeLowerBound = Long.MIN_VALUE;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index c0064917f8..f0ee6155a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -24,14 +24,16 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.reader.IDataBlockReader;
+import org.apache.iotdb.db.engine.compaction.reader.SeriesDataBlockReader;
 import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -41,8 +43,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -99,26 +101,27 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
             COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
             "s1",
             new MeasurementSchema("s1", TSDataType.INT64));
-    IBatchReader tsFilesReader =
-        new SeriesRawDataBatchReader(
+    IDataBlockReader tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.INT64,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             seqResources,
             unseqResources,
-            null,
-            null,
             true);
     int count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        assertEquals(batchData.currentTime(), batchData.currentValue());
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+      while (iterator.hasNext()) {
+        assertEquals(iterator.currentTime(), iterator.currentValue());
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+
+    tsBlockReader.close();
     assertEquals(500, count);
 
     List<TsFileResource> targetResources =
@@ -129,26 +132,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
     performer.perform();
     CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
 
-    tsFilesReader =
-        new SeriesRawDataBatchReader(
+    tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.INT64,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             targetResources,
             new ArrayList<>(),
-            null,
-            null,
             true);
+
     count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        assertEquals(batchData.currentTime(), batchData.currentValue());
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+      while (iterator.hasNext()) {
+        assertEquals(iterator.currentTime(), iterator.currentValue());
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+
+    tsBlockReader.close();
     assertEquals(500, count);
   }
 
@@ -175,30 +180,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < 3) {
@@ -240,30 +246,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < 3 && j < 5) {
@@ -290,26 +297,26 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s1",
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            assertEquals(batchData.currentTime(), batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            assertEquals(iterator.currentTime(), iterator.currentValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -329,26 +336,26 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s1",
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            assertEquals(batchData.currentTime(), batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            assertEquals(iterator.currentTime(), iterator.currentValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -380,34 +387,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if ((100 <= batchData.currentTime() && batchData.currentTime() < 170)
-                || (270 <= batchData.currentTime() && batchData.currentTime() < 340)) {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
-            } else if ((200 <= batchData.currentTime() && batchData.currentTime() < 270)
-                || (370 <= batchData.currentTime() && batchData.currentTime() < 440)) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
+                || (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+            } else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
+                || (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(410, count);
         } else if (i < 3 && j < 5) {
@@ -437,34 +444,35 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if ((100 <= batchData.currentTime() && batchData.currentTime() < 170)
-                || (270 <= batchData.currentTime() && batchData.currentTime() < 340)) {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
-            } else if ((200 <= batchData.currentTime() && batchData.currentTime() < 270)
-                || (370 <= batchData.currentTime() && batchData.currentTime() < 440)) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if ((100 <= iterator.currentTime() && iterator.currentTime() < 170)
+                || (270 <= iterator.currentTime() && iterator.currentTime() < 340)) {
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+            } else if ((200 <= iterator.currentTime() && iterator.currentTime() < 270)
+                || (370 <= iterator.currentTime() && iterator.currentTime() < 440)) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(410, count);
         } else if (i < 3 && j < 5) {
@@ -522,33 +530,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
-            } else if (batchData.currentTime() < 850) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+              assertEquals(iterator.currentTime(), iterator.currentValue());
+            } else if (iterator.currentTime() < 850) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -575,33 +583,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
-            } else if (batchData.currentTime() < 850) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+              assertEquals(iterator.currentTime(), iterator.currentValue());
+            } else if (iterator.currentTime() < 850) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 5)) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -650,33 +658,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
-            } else if (batchData.currentTime() < 850) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+              assertEquals(iterator.currentTime(), iterator.currentValue());
+            } else if (iterator.currentTime() < 850) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -703,33 +711,33 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
-            } else if (batchData.currentTime() < 850) {
-              assertEquals(batchData.currentTime() + 100, batchData.currentValue());
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
+              assertEquals(iterator.currentTime(), iterator.currentValue());
+            } else if (iterator.currentTime() < 850) {
+              assertEquals(iterator.currentTime() + 100, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime() + 200, batchData.currentValue());
+              assertEquals(iterator.currentTime() + 200, iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -780,25 +788,25 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(0, count);
       }
     }
@@ -817,25 +825,25 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(0, count);
       }
     }
@@ -860,28 +868,29 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
+
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             assertEquals(
-                batchData.currentTime(),
-                ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                iterator.currentTime(),
+                ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -904,28 +913,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             assertEquals(
-                batchData.currentTime(),
-                ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                iterator.currentTime(),
+                ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -959,34 +968,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1015,34 +1024,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1080,34 +1089,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1136,34 +1145,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 false);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() >= 600) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() >= 600) {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(400, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1202,39 +1211,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(1450, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1263,39 +1272,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(1450, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 5) {
@@ -1373,39 +1382,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -1439,39 +1448,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -1533,39 +1542,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
           assertEquals(0, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
@@ -1596,39 +1605,39 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
-            if (batchData.currentTime() < 200
-                || (batchData.currentTime() < 550 && batchData.currentTime() >= 500)) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
+            if (iterator.currentTime() < 200
+                || (iterator.currentTime() < 550 && iterator.currentTime() >= 500)) {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
-            } else if (batchData.currentTime() < 850) {
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
+            } else if (iterator.currentTime() < 850) {
               assertEquals(
-                  batchData.currentTime() + 100,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 100,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime() + 200,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 200,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == TsFileGeneratorUtils.getAlignDeviceOffset()) {
           assertEquals(0, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
@@ -1661,28 +1670,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             assertEquals(
-                batchData.currentTime(),
-                ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                iterator.currentTime(),
+                ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -1705,28 +1714,28 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 new ArrayList<>(),
                 targetResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             assertEquals(
-                batchData.currentTime(),
-                ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                iterator.currentTime(),
+                ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         assertEquals(500, count);
       }
     }
@@ -1755,31 +1764,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
             COMPACTION_TEST_SG + PATH_SEPARATOR + "d1",
             "s1",
             new MeasurementSchema("s1", TSDataType.INT64));
-    IBatchReader tsFilesReader =
-        new SeriesRawDataBatchReader(
+    IDataBlockReader tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.INT64,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             seqResources,
             unseqResources,
-            null,
-            null,
             true);
     int count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        if (batchData.currentTime() % 100 < 50) {
-          assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+      while (iterator.hasNext()) {
+        if (iterator.currentTime() % 100 < 50) {
+          assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
         } else {
-          assertEquals(batchData.currentTime(), batchData.currentValue());
+          assertEquals(iterator.currentTime(), iterator.currentValue());
         }
 
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+    tsBlockReader.close();
     assertEquals(500, count);
 
     List<TsFileResource> targetResources =
@@ -1790,31 +1799,31 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
     performer.perform();
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
-    tsFilesReader =
-        new SeriesRawDataBatchReader(
+    tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.INT64,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             targetResources,
             new ArrayList<>(),
-            null,
-            null,
             true);
     count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        if (batchData.currentTime() % 100 < 50) {
-          assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+      while (iterator.hasNext()) {
+        if (iterator.currentTime() % 100 < 50) {
+          assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
         } else {
-          assertEquals(batchData.currentTime(), batchData.currentValue());
+          assertEquals(iterator.currentTime(), iterator.currentValue());
         }
 
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+    tsBlockReader.close();
     assertEquals(500, count);
   }
 
@@ -1851,37 +1860,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(1280, count);
         } else if (i < 1 && j < 4) {
@@ -1946,45 +1955,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (measurementMaxTime.get(
                     COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
-                >= batchData.currentTime()) {
+                >= iterator.currentTime()) {
               Assert.fail();
             }
             measurementMaxTime.put(
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
-                batchData.currentTime());
+                iterator.currentTime());
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 2 && j < 3) {
           assertEquals(1280, count);
         } else if (i < 1 && j < 4) {
@@ -2046,37 +2055,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -2142,45 +2151,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (measurementMaxTime.get(
                     COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
-                >= batchData.currentTime()) {
+                >= iterator.currentTime()) {
               Assert.fail();
             }
             measurementMaxTime.put(
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
-                batchData.currentTime());
+                iterator.currentTime());
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == 0 && j == 0) || (i == 0 && j == 1) || (i == 2 && j == 4) || (i == 3 && j == 4)) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -2244,37 +2253,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 2) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -2334,45 +2343,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (measurementMaxTime.get(
                     COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
-                >= batchData.currentTime()) {
+                >= iterator.currentTime()) {
               Assert.fail();
             }
             measurementMaxTime.put(
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
-                batchData.currentTime());
+                iterator.currentTime());
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 2) {
           assertEquals(0, count);
         } else if (i < 2 && j < 3) {
@@ -2434,37 +2443,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         } else {
@@ -2507,45 +2516,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (measurementMaxTime.get(
                     COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
-                >= batchData.currentTime()) {
+                >= iterator.currentTime()) {
               Assert.fail();
             }
             measurementMaxTime.put(
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
-                batchData.currentTime());
+                iterator.currentTime());
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         } else {
@@ -2599,37 +2608,37 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 1) {
           if (j < 4) {
             assertEquals(630, count);
@@ -2709,45 +2718,45 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 "s" + j,
                 new MeasurementSchema("s" + j, TSDataType.INT64));
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.INT64,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
             if (measurementMaxTime.get(
                     COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
-                >= batchData.currentTime()) {
+                >= iterator.currentTime()) {
               Assert.fail();
             }
             measurementMaxTime.put(
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
-                batchData.currentTime());
+                iterator.currentTime());
             if (i == 0
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
-              assertEquals(batchData.currentTime() + 20000, batchData.currentValue());
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+              assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
             } else if ((i < 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
-              assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+              assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
             } else {
-              assertEquals(batchData.currentTime(), batchData.currentValue());
+              assertEquals(iterator.currentTime(), iterator.currentValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < 1) {
           if (j < 4) {
             assertEquals(630, count);
@@ -2792,34 +2801,34 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
             COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000",
             Collections.singletonList("s1"),
             schemas);
-    IBatchReader tsFilesReader =
-        new SeriesRawDataBatchReader(
+    IDataBlockReader tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.VECTOR,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             seqResources,
             unseqResources,
-            null,
-            null,
             true);
     int count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        if (batchData.currentTime() % 100 < 50) {
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+      while (iterator.hasNext()) {
+        if (iterator.currentTime() % 100 < 50) {
           assertEquals(
-              batchData.currentTime() + 10000,
-              ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+              iterator.currentTime() + 10000,
+              ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
         } else {
           assertEquals(
-              batchData.currentTime(),
-              ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+              iterator.currentTime(),
+              ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
         }
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+    tsBlockReader.close();
     assertEquals(500, count);
 
     List<TsFileResource> targetResources =
@@ -2830,35 +2839,35 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
     performer.perform();
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
-    tsFilesReader =
-        new SeriesRawDataBatchReader(
+    tsBlockReader =
+        new SeriesDataBlockReader(
             path,
             TSDataType.INT64,
-            EnvironmentUtils.TEST_QUERY_CONTEXT,
+            FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
             targetResources,
             new ArrayList<>(),
-            null,
-            null,
             true);
     count = 0;
-    while (tsFilesReader.hasNextBatch()) {
-      BatchData batchData = tsFilesReader.nextBatch();
-      while (batchData.hasCurrent()) {
-        if (batchData.currentTime() % 100 < 50) {
+    while (tsBlockReader.hasNextBatch()) {
+      TsBlock block = tsBlockReader.nextBatch();
+      IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+      while (iterator.hasNext()) {
+        if (iterator.currentTime() % 100 < 50) {
           assertEquals(
-              batchData.currentTime() + 10000,
-              ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+              iterator.currentTime() + 10000,
+              ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
         } else {
           assertEquals(
-              batchData.currentTime(),
-              ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+              iterator.currentTime(),
+              ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
         }
 
         count++;
-        batchData.next();
+        iterator.next();
       }
     }
-    tsFilesReader.close();
+    tsBlockReader.close();
     assertEquals(500, count);
   }
 
@@ -2899,43 +2908,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(1280, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
@@ -2971,43 +2980,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j < 3) {
           assertEquals(1280, count);
         } else if (i < TsFileGeneratorUtils.getAlignDeviceOffset() + 1 && j < 4) {
@@ -3097,43 +3106,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -3203,43 +3212,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if ((i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 0)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() && j == 1)
             || (i == TsFileGeneratorUtils.getAlignDeviceOffset() + 2 && j == 4)
@@ -3335,43 +3344,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         }
@@ -3412,43 +3421,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         }
@@ -3548,43 +3557,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 seqResources,
                 unseqResources,
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         }
@@ -3625,43 +3634,43 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                 COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                 Collections.singletonList("s" + j),
                 schemas);
-        IBatchReader tsFilesReader =
-            new SeriesRawDataBatchReader(
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
                 path,
                 TSDataType.VECTOR,
-                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                 targetResources,
                 new ArrayList<>(),
-                null,
-                null,
                 true);
         int count = 0;
-        while (tsFilesReader.hasNextBatch()) {
-          BatchData batchData = tsFilesReader.nextBatch();
-          while (batchData.hasCurrent()) {
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockAlignedRowIterator();
+          while (iterator.hasNext()) {
             if (i == TsFileGeneratorUtils.getAlignDeviceOffset()
-                && ((450 <= batchData.currentTime() && batchData.currentTime() < 550)
-                    || (550 <= batchData.currentTime() && batchData.currentTime() < 650))) {
+                && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+                    || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
               assertEquals(
-                  batchData.currentTime() + 20000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 20000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else if ((i < TsFileGeneratorUtils.getAlignDeviceOffset() + 3 && j < 4)
-                && ((20 <= batchData.currentTime() && batchData.currentTime() < 220)
-                    || (250 <= batchData.currentTime() && batchData.currentTime() < 450)
-                    || (480 <= batchData.currentTime() && batchData.currentTime() < 680))) {
+                && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+                    || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+                    || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
               assertEquals(
-                  batchData.currentTime() + 10000,
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime() + 10000,
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             } else {
               assertEquals(
-                  batchData.currentTime(),
-                  ((TsPrimitiveType[]) (batchData.currentValue()))[0].getValue());
+                  iterator.currentTime(),
+                  ((TsPrimitiveType[]) (iterator.currentValue()))[0].getValue());
             }
             count++;
-            batchData.next();
+            iterator.next();
           }
         }
-        tsFilesReader.close();
+        tsBlockReader.close();
         if (i == 0 || i == 1 || i == 2) {
           assertEquals(0, count);
         }
@@ -3811,30 +3820,30 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
                   COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
                   "s" + j,
                   new MeasurementSchema("s" + j, TSDataType.INT64));
-          IBatchReader tsFilesReader =
-              new SeriesRawDataBatchReader(
+          IDataBlockReader tsBlockReader =
+              new SeriesDataBlockReader(
                   path,
                   TSDataType.INT64,
-                  EnvironmentUtils.TEST_QUERY_CONTEXT,
+                  FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                      EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
                   targetResources,
                   new ArrayList<>(),
-                  null,
-                  null,
                   true);
           int count = 0;
-          while (tsFilesReader.hasNextBatch()) {
-            BatchData batchData = tsFilesReader.nextBatch();
-            while (batchData.hasCurrent()) {
-              if (batchData.currentTime() < 20) {
-                assertEquals(batchData.currentTime(), batchData.currentValue());
+          while (tsBlockReader.hasNextBatch()) {
+            TsBlock block = tsBlockReader.nextBatch();
+            IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+            while (iterator.hasNext()) {
+              if (iterator.currentTime() < 20) {
+                assertEquals(iterator.currentTime(), iterator.currentValue());
               } else {
-                assertEquals(batchData.currentTime() + 10000, batchData.currentValue());
+                assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
               }
               count++;
-              batchData.next();
+              iterator.next();
             }
           }
-          tsFilesReader.close();
+          tsBlockReader.close();
           if (i < 2 && j < 3) {
             assertEquals(920, count);
           } else {