You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 06:33:34 UTC

[iotdb] branch ty-mpp created (now e61858d)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at e61858d  TsBlock pre implementation

This branch includes the following new commits:

     new e61858d  TsBlock pre implementation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: TsBlock pre implementation

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e61858d38da8e6753a5f1ba314c3aaab806a4f18
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 11:35:54 2022 +0800

    TsBlock pre implementation
---
 pom.xml                                            |   5 +
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |   2 +-
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  |   2 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   2 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   2 +-
 .../db/mpp/operator/process/AggregateOperator.java |   2 +-
 .../mpp/operator/process/DeviceMergeOperator.java  |   2 +-
 .../db/mpp/operator/process/FillOperator.java      |   2 +-
 .../mpp/operator/process/FilterNullOperator.java   |   2 +-
 .../mpp/operator/process/GroupByLevelOperator.java |   2 +-
 .../db/mpp/operator/process/LimitOperator.java     |   6 +-
 .../db/mpp/operator/process/OffsetOperator.java    |   2 +-
 .../db/mpp/operator/process/SortOperator.java      |   2 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  36 ++-
 .../db/mpp/operator/sink/FragmentSinkOperator.java |   2 +-
 .../iotdb/db/mpp/operator/sink/SinkOperator.java   |   2 +-
 .../mpp/operator/source/AlignedSeriesScanUtil.java |  88 ++++++
 .../source/SeriesAggregateScanOperator.java        |   2 +-
 .../db/mpp/operator/source/SeriesScanOperator.java |   4 +-
 .../db/mpp/operator/source/SeriesScanUtil.java     |  56 ++--
 .../query/reader/chunk/MemAlignedPageReader.java   |  38 ++-
 .../iotdb/db/query/reader/chunk/MemPageReader.java | 113 +++++---
 tsfile/pom.xml                                     |   4 +
 .../apache/iotdb/tsfile/read/common/Column.java    |  21 --
 .../iotdb/tsfile/read/common/TimeColumn.java       |  21 --
 .../apache/iotdb/tsfile/read/common/TsBlock.java   | 175 ------------
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 317 +++++++++++++++++++++
 .../tsfile/read/common/block/TsBlockBuilder.java   | 295 +++++++++++++++++++
 .../read/common/block/TsBlockBuilderStatus.java    |  75 +++++
 .../read/common/{ => block}/TsBlockMetadata.java   |   2 +-
 .../read/common/block/column/BinaryColumn.java     | 110 +++++++
 .../common/block/column/BinaryColumnBuilder.java   | 148 ++++++++++
 .../read/common/block/column/BooleanColumn.java    | 109 +++++++
 .../common/block/column/BooleanColumnBuilder.java  | 150 ++++++++++
 .../tsfile/read/common/block/column/Column.java    |  87 ++++++
 .../read/common/block/column/ColumnBuilder.java    |  81 ++++++
 .../common/block/column/ColumnBuilderStatus.java   |  88 ++++++
 .../read/common/block/column/ColumnUtil.java       |  97 +++++++
 .../read/common/block/column/DoubleColumn.java     | 109 +++++++
 .../common/block/column/DoubleColumnBuilder.java   | 150 ++++++++++
 .../read/common/block/column/FloatColumn.java      | 108 +++++++
 .../common/block/column/FloatColumnBuilder.java    | 150 ++++++++++
 .../tsfile/read/common/block/column/IntColumn.java | 108 +++++++
 .../read/common/block/column/IntColumnBuilder.java | 150 ++++++++++
 .../read/common/block/column/LongColumn.java       | 108 +++++++
 .../common/block/column/LongColumnBuilder.java     | 150 ++++++++++
 .../block/column/RunLengthEncodedColumn.java       | 133 +++++++++
 .../read/common/block/column/TimeColumn.java       |  95 ++++++
 .../common/block/column/TimeColumnBuilder.java     | 126 ++++++++
 .../iotdb/tsfile/read/reader/IPageReader.java      |   2 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |  38 ++-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |  81 ++++--
 52 files changed, 3323 insertions(+), 339 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3f1c875..f19c018 100644
--- a/pom.xml
+++ b/pom.xml
@@ -506,6 +506,11 @@
                 <version>${airline.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.openjdk.jol</groupId>
+                <artifactId>jol-core</artifactId>
+                <version>0.2</version>
+            </dependency>
+            <dependency>
                 <groupId>org.fusesource.mqtt-client</groupId>
                 <artifactId>mqtt-client</artifactId>
                 <version>1.12</version>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 0d64cbc..6b4e07c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index b493759..ef2919c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 0216556..5f8607e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index d545c4f..df89218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index c439ad4..a08fb68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index b9fce3a..ebfb54e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index e2732cf..52a3fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index d8a33ba..0083ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 6e19233..285427a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 47c81bc..25beb6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -56,8 +56,8 @@ public class LimitOperator implements ProcessOperator {
   public TsBlock next() throws IOException {
     TsBlock block = child.next();
     TsBlock res = block;
-    if (block.getCount() <= remainingLimit) {
-      remainingLimit -= block.getCount();
+    if (block.getPositionCount() <= remainingLimit) {
+      remainingLimit -= block.getPositionCount();
     } else {
       res = block.getRegion(0, (int) remainingLimit);
       remainingLimit = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index e9eef20..a22985c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index f0ac49b..5e2f0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index c5665ee..60fa4f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -22,8 +22,13 @@ import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.read.common.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -48,11 +53,14 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private final int columnCount;
 
+  private final List<TSDataType> dataTypes;
+
   public TimeJoinOperator(
       OperatorContext operatorContext,
       List<Operator> children,
       OrderBy mergeOrder,
-      int columnCount) {
+      int columnCount,
+      List<TSDataType> dataTypes) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.inputCount = children.size();
@@ -61,6 +69,7 @@ public class TimeJoinOperator implements ProcessOperator {
     this.noMoreTsBlocks = new boolean[this.inputCount];
     this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
     this.columnCount = columnCount;
+    this.dataTypes = dataTypes;
   }
 
   @Override
@@ -92,7 +101,7 @@ public class TimeJoinOperator implements ProcessOperator {
         inputIndex[i] = 0;
         inputTsBlocks[i] = children.get(i).next();
         if (!empty(i)) {
-          int rowSize = inputTsBlocks[i].getCount();
+          int rowSize = inputTsBlocks[i].getPositionCount();
           for (int row = 0; row < rowSize; row++) {
             timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
           }
@@ -113,22 +122,29 @@ public class TimeJoinOperator implements ProcessOperator {
       return null;
     }
 
-    TsBlock res = new TsBlock(columnCount);
+    TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) {
-      res.addTime(timeSelector.pollFirst());
+      timeBuilder.writeLong(timeSelector.pollFirst());
     }
 
+    tsBlockBuilder.buildValueColumnBuilders(dataTypes);
+
     for (int i = 0, column = 0; i < inputCount; i++) {
       TsBlock block = inputTsBlocks[i];
       TimeColumn timeColumn = block.getTimeColumn();
       int valueColumnCount = block.getValueColumnCount();
       int startIndex = inputIndex[i];
       for (int j = 0; j < valueColumnCount; j++) {
-        inputIndex[i] =
-            res.addValues(column++, timeColumn, block.getColumn(j), startIndex, currentEndTime);
+        startIndex = inputIndex[i];
+        ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(column++);
+        Column valueColumn = block.getColumn(j);
+        startIndex = columnBuilder.appendColumn(timeColumn, valueColumn, startIndex, timeBuilder);
       }
+      inputIndex[i] = startIndex;
     }
-    return res;
+    return tsBlockBuilder.build();
   }
 
   @Override
@@ -156,6 +172,6 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private boolean empty(int columnIndex) {
     return inputTsBlocks[columnIndex] == null
-        || inputTsBlocks[columnIndex].getCount() == inputIndex[columnIndex];
+        || inputTsBlocks[columnIndex].getPositionCount() == inputIndex[columnIndex];
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index 7be5cfa..7bbccbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.sink;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index e5a3e15..b691db0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.sink;
 
 import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 public interface SinkOperator extends Operator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
new file mode 100644
index 0000000..9fd48c7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AlignedSeriesScanUtil extends SeriesScanUtil {
+
+  private final List<TSDataType> dataTypes;
+
+  public AlignedSeriesScanUtil(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      FragmentInstanceContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    super(
+        seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter, ascending);
+    dataTypes =
+        ((AlignedPath) seriesPath)
+            .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+  }
+
+  @Override
+  protected PriorityMergeReader getPriorityMergeReader() {
+    return new AlignedPriorityMergeReader();
+  }
+
+  @Override
+  protected DescPriorityMergeReader getDescPriorityMergeReader() {
+    return new AlignedDescPriorityMergeReader();
+  }
+
+  @Override
+  protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, (AlignedPath) seriesPath, context, filter);
+  }
+
+  @Override
+  protected List<TSDataType> getTsDataTypeList() {
+    return dataTypes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 0ef6b09..87151ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 19aa191..e84e649 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -127,6 +127,6 @@ public class SeriesScanOperator implements Operator {
   }
 
   private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || !tsBlock.hasNext();
+    return tsBlock == null || tsBlock.isEmpty();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 9e774dc..e3d2f9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -35,7 +35,9 @@ import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -389,7 +391,7 @@ public class SeriesScanUtil {
     } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
       if (hasNextOverlappedPage()) {
         cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
           hasCachedNextOverlappedPage = true;
           return true;
         }
@@ -440,7 +442,7 @@ public class SeriesScanUtil {
        */
       if (hasNextOverlappedPage()) {
         cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
           hasCachedNextOverlappedPage = true;
           return true;
         }
@@ -644,7 +646,8 @@ public class SeriesScanUtil {
       if (mergeReader.hasNextTimeValuePair()) {
 
         // TODO we still need to consider data type, ascending and descending here
-        cachedTsBlock = new TsBlock();
+        TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
+        TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
         long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
         while (mergeReader.hasNextTimeValuePair()) {
 
@@ -662,7 +665,7 @@ public class SeriesScanUtil {
              * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
              * we could use the first sequence page reader later
              */
-            if (cachedTsBlock.hasNext() || firstPageReader != null || !seqPageReaders.isEmpty()) {
+            if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) {
               break;
             }
             // so, we don't have other data except mergeReader
@@ -689,7 +692,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < firstPageReader.getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+              hasCachedNextOverlappedPage = !builder.isEmpty();
+              cachedTsBlock = builder.build();
               return hasCachedNextOverlappedPage;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
@@ -716,7 +720,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < seqPageReaders.get(0).getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+              hasCachedNextOverlappedPage = !builder.isEmpty();
+              cachedTsBlock = builder.build();
               return hasCachedNextOverlappedPage;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
@@ -752,37 +757,40 @@ public class SeriesScanUtil {
 
           if (valueFilter == null
               || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            builder.declarePosition();
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
             switch (dataType) {
               case BOOLEAN:
-                //            tsBlock.putBoolean(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getBoolean());
+                builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
                 break;
               case INT32:
-                //            tsBlock.putInt(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getInt());
+                builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
                 break;
               case INT64:
-                //            tsBlock.putLong(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getLong());
+                builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
                 break;
               case FLOAT:
-                //            tsBlock.putFloat(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getFloat());
+                builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
                 break;
               case DOUBLE:
-                //            tsBlock.putDouble(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getDouble());
+                builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
                 break;
               case TEXT:
-                //            tsBlock.putBinary(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getBinary());
+                builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+                break;
+              case VECTOR:
+                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                for (int i = 0; i < values.length; i++) {
+                  builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                }
                 break;
               default:
                 throw new UnSupportedDataTypeException(String.valueOf(dataType));
             }
           }
         }
-        hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+        hasCachedNextOverlappedPage = builder.isEmpty();
+        cachedTsBlock = builder.build();
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
          */
@@ -1006,7 +1014,7 @@ public class SeriesScanUtil {
     }
   }
 
-  protected void unpackSeqTsFileResource() throws IOException {
+  private void unpackSeqTsFileResource() throws IOException {
     ITimeSeriesMetadata timeseriesMetadata =
         loadTimeSeriesMetadata(
             orderUtils.getNextSeqFileResource(true),
@@ -1020,7 +1028,7 @@ public class SeriesScanUtil {
     }
   }
 
-  protected void unpackUnseqTsFileResource() throws IOException {
+  private void unpackUnseqTsFileResource() throws IOException {
     ITimeSeriesMetadata timeseriesMetadata =
         loadTimeSeriesMetadata(
             orderUtils.getNextUnseqFileResource(true),
@@ -1046,6 +1054,10 @@ public class SeriesScanUtil {
         resource, seriesPath, context, filter, allSensors);
   }
 
+  protected List<TSDataType> getTsDataTypeList() {
+    return Collections.singletonList(dataType);
+  }
+
   protected Filter getAnyFilter() {
     return timeFilter != null ? timeFilter : valueFilter;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 7216ea7..e7ebc1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.query.reader.chunk;
 
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -33,6 +35,7 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
 
 public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
 
@@ -81,8 +84,37 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
-    // TODO need to implement for mpp
-    throw new IllegalStateException("We have not implemented this method.");
+    // TODO change from the row-based style to column-based style
+    TsBlockBuilder builder =
+        new TsBlockBuilder(
+            chunkMetadata.getValueChunkMetadataList().stream()
+                .map(IChunkMetadata::getDataType)
+                .collect(Collectors.toList()));
+    while (timeValuePairIterator.hasNextTimeValuePair()) {
+      TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+      TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+      // save the first not null value of each row
+      Object firstNotNullObject = null;
+      for (TsPrimitiveType value : values) {
+        if (value != null) {
+          firstNotNullObject = value.getValue();
+          break;
+        }
+      }
+      // if all the sub sensors' value are null in current time
+      // or current row is not satisfied with the filter, just discard it
+      // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will only
+      // accept AlignedPath with only one sub sensor
+      if (firstNotNullObject != null
+          && (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), firstNotNullObject))) {
+        builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
+        for (int i = 0; i < values.length; i++) {
+          builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+        }
+      }
+    }
+    return builder.build();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 79997b5..16704a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -25,13 +25,17 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
+import java.util.Collections;
 
 public class MemPageReader implements IPageReader {
 
@@ -66,43 +70,80 @@ public class MemPageReader implements IPageReader {
     TSDataType dataType = chunkMetadata.getDataType();
     // TODO we still need to consider data type, ascending and descending here
 
-    TsBlock tsBlock = new TsBlock();
-    while (timeValuePairIterator.hasNextTimeValuePair()) {
-      TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
-      if (valueFilter == null
-          || valueFilter.satisfy(
-              timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
-        switch (dataType) {
-          case BOOLEAN:
-            //            tsBlock.putBoolean(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getBoolean());
-            break;
-          case INT32:
-            //            tsBlock.putInt(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getInt());
-            break;
-          case INT64:
-            //            tsBlock.putLong(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getLong());
-            break;
-          case FLOAT:
-            //            tsBlock.putFloat(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getFloat());
-            break;
-          case DOUBLE:
-            //            tsBlock.putDouble(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getDouble());
-            break;
-          case TEXT:
-            //            tsBlock.putBinary(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getBinary());
-            break;
-          default:
-            throw new UnSupportedDataTypeException(String.valueOf(dataType));
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
+    switch (dataType) {
+      case BOOLEAN:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeBoolean(timeValuePair.getValue().getBoolean());
+          }
         }
-      }
+        break;
+      case INT32:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeInt(timeValuePair.getValue().getInt());
+          }
+        }
+        break;
+      case INT64:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeLong(timeValuePair.getValue().getLong());
+          }
+        }
+        break;
+      case FLOAT:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeFloat(timeValuePair.getValue().getFloat());
+          }
+        }
+        break;
+      case DOUBLE:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeDouble(timeValuePair.getValue().getDouble());
+          }
+        }
+        break;
+      case TEXT:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeBinary(timeValuePair.getValue().getBinary());
+          }
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
-    return tsBlock;
+    return tsBlockBuilder.build();
   }
 
   @Override
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index beb2149..147ed65 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -68,6 +68,10 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.openjdk.jol</groupId>
+            <artifactId>jol-core</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
deleted file mode 100644
index c4933e1..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public interface Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
deleted file mode 100644
index 19937f6..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public class TimeColumn implements Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
deleted file mode 100644
index a6bffbf..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
-
-import static java.lang.String.format;
-
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
-
-  // Describe the column info
-  private TsBlockMetadata metadata;
-
-  private TimeColumn timeColumn;
-
-  private Column[] valueColumns;
-
-  private int count;
-
-  public TsBlock() {}
-
-  public TsBlock(int columnCount) {
-    timeColumn = new TimeColumn();
-    valueColumns = new Column[columnCount];
-  }
-
-  public boolean hasNext() {
-    return false;
-  }
-
-  public void next() {}
-
-  public TsBlockMetadata getMetadata() {
-    return metadata;
-  }
-
-  public int getCount() {
-    return count;
-  }
-
-  /** TODO need to be implemented after the data structure being defined */
-  public long getEndTime() {
-    return -1;
-  }
-
-  public boolean isEmpty() {
-    return count == 0;
-  }
-
-  /**
-   * TODO has not been implemented yet
-   *
-   * @param positionOffset start offset
-   * @param length slice length
-   * @return view of current TsBlock start from positionOffset to positionOffset + length
-   */
-  public TsBlock getRegion(int positionOffset, int length) {
-    if (positionOffset < 0 || length < 0 || positionOffset + length > count) {
-      throw new IndexOutOfBoundsException(
-          format(
-              "Invalid position %s and length %s in page with %s positions",
-              positionOffset, length, count));
-    }
-    return this;
-  }
-
-  /** TODO need to be implemented after the data structure being defined */
-  public long getTimeByIndex(int index) {
-    return -1;
-  }
-
-  public void addTime(long time) {}
-
-  public int getValueColumnCount() {
-    return valueColumns.length;
-  }
-
-  public TimeColumn getTimeColumn() {
-    return timeColumn;
-  }
-
-  public Column getColumn(int columnIndex) {
-    return valueColumns[columnIndex];
-  }
-
-  public int addValues(
-      int columnIndex, TimeColumn timeColumn, Column valueColumn, int rowIndex, long endTime) {
-
-    return rowIndex;
-  }
-
-  public TsBlockIterator getTsBlockIterator() {
-    return new TsBlockIterator();
-  }
-
-  // TODO need to be implemented when the data structure is defined
-  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
-
-    @Override
-    public boolean hasNext() {
-      return TsBlock.this.hasNext();
-    }
-
-    @Override
-    public boolean hasNext(long minBound, long maxBound) {
-      return hasNext();
-    }
-
-    @Override
-    public void next() {
-      TsBlock.this.next();
-    }
-
-    @Override
-    public long currentTime() {
-      return -1;
-    }
-
-    @Override
-    public Object currentValue() {
-      return null;
-    }
-
-    @Override
-    public void reset() {}
-
-    @Override
-    public int totalLength() {
-      return TsBlock.this.getCount();
-    }
-
-    @Override
-    public boolean hasNextTimeValuePair() {
-      return hasNext();
-    }
-
-    @Override
-    public TimeValuePair nextTimeValuePair() {
-      return null;
-    }
-
-    @Override
-    public TimeValuePair currentTimeValuePair() {
-      return null;
-    }
-
-    @Override
-    public void close() {}
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
new file mode 100644
index 0000000..9c38360
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+/**
+ * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
+ * the columns.
+ */
+public class TsBlock {
+
+  public static final int INSTANCE_SIZE = ClassLayout.parseClass(TsBlock.class).instanceSize();
+
+  private static final Column[] EMPTY_COLUMNS = new Column[0];
+
+  /**
+   * Visible to give trusted classes like {@link TsBlockBuilder} access to a constructor that
+   * doesn't defensively copy the valueColumns
+   */
+  static TsBlock wrapBlocksWithoutCopy(
+      int positionCount, TimeColumn timeColumn, Column[] valueColumns) {
+    return new TsBlock(false, positionCount, timeColumn, valueColumns);
+  }
+
+  // TODO rethink about if we really need this field
+  // Describe the column info
+  private TsBlockMetadata metadata;
+
+  private final TimeColumn timeColumn;
+
+  private final Column[] valueColumns;
+
+  private final int positionCount;
+
+  private volatile long retainedSizeInBytes = -1;
+
+  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+  }
+
+  public TsBlock(int positionCount) {
+    this(false, positionCount, null, EMPTY_COLUMNS);
+  }
+
+  public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
+    this(true, positionCount, timeColumn, valueColumns);
+  }
+
+  private TsBlock(
+      boolean columnsCopyRequired,
+      int positionCount,
+      TimeColumn timeColumn,
+      Column[] valueColumns) {
+    requireNonNull(valueColumns, "blocks is null");
+    this.positionCount = positionCount;
+    this.timeColumn = timeColumn;
+    if (valueColumns.length == 0) {
+      this.valueColumns = EMPTY_COLUMNS;
+      // Empty blocks are not considered "retained" by any particular page
+      this.retainedSizeInBytes = INSTANCE_SIZE;
+    } else {
+      this.valueColumns = columnsCopyRequired ? valueColumns.clone() : valueColumns;
+    }
+  }
+
+  public boolean hasNext() {
+    return false;
+  }
+
+  public void next() {}
+
+  public TsBlockMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public long getEndTime() {
+    return timeColumn.getEndTime();
+  }
+
+  public boolean isEmpty() {
+    return positionCount == 0;
+  }
+
+  public long getRetainedSizeInBytes() {
+    long retainedSizeInBytes = this.retainedSizeInBytes;
+    if (retainedSizeInBytes < 0) {
+      return updateRetainedSize();
+    }
+    return retainedSizeInBytes;
+  }
+
+  /**
+   * @param positionOffset start offset
+   * @param length slice length
+   * @return view of current TsBlock start from positionOffset to positionOffset + length
+   */
+  public TsBlock getRegion(int positionOffset, int length) {
+    if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid position %s and length %s in page with %s positions",
+              positionOffset, length, positionCount));
+    }
+    int channelCount = getValueColumnCount();
+    Column[] slicedColumns = new Column[channelCount];
+    for (int i = 0; i < channelCount; i++) {
+      slicedColumns[i] = valueColumns[i].getRegion(positionOffset, length);
+    }
+    return wrapBlocksWithoutCopy(
+        length, (TimeColumn) timeColumn.getRegion(positionOffset, length), slicedColumns);
+  }
+
+  public TsBlock appendValueColumn(Column column) {
+    requireNonNull(column, "column is null");
+    if (positionCount != column.getPositionCount()) {
+      throw new IllegalArgumentException("Block does not have same position count");
+    }
+
+    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+    newBlocks[valueColumns.length] = column;
+    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+  }
+
+  public long getTimeByIndex(int index) {
+    return timeColumn.getLong(index);
+  }
+
+  public int getValueColumnCount() {
+    return valueColumns.length;
+  }
+
+  public TimeColumn getTimeColumn() {
+    return timeColumn;
+  }
+
+  public Column getColumn(int columnIndex) {
+    return valueColumns[columnIndex];
+  }
+
+  public TsBlockIterator getTsBlockIterator() {
+    return new TsBlockIterator(0);
+  }
+
+  /** Only used for the batch data of vector time series. */
+  public IBatchDataIterator getBatchDataIterator(int subIndex) {
+    return new AlignedTsBlockIterator(0, subIndex);
+  }
+
+  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+
+    protected int index;
+
+    public TsBlockIterator(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < positionCount;
+    }
+
+    @Override
+    public boolean hasNext(long minBound, long maxBound) {
+      return hasNext();
+    }
+
+    @Override
+    public void next() {
+      index++;
+    }
+
+    @Override
+    public long currentTime() {
+      return timeColumn.getLong(index);
+    }
+
+    @Override
+    public Object currentValue() {
+      return valueColumns[0].getTsPrimitiveType(index).getValue();
+    }
+
+    @Override
+    public void reset() {
+      index = 0;
+    }
+
+    @Override
+    public int totalLength() {
+      return positionCount;
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      return hasNext();
+    }
+
+    @Override
+    public TimeValuePair nextTimeValuePair() {
+      TimeValuePair res = currentTimeValuePair();
+      next();
+      return res;
+    }
+
+    @Override
+    public TimeValuePair currentTimeValuePair() {
+      return new TimeValuePair(
+          timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  private class AlignedTsBlockIterator extends TsBlockIterator {
+
+    private final int subIndex;
+
+    private AlignedTsBlockIterator(int index, int subIndex) {
+      super(index);
+      this.subIndex = subIndex;
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (super.hasNext() && currentValue() == null) {
+        super.next();
+      }
+      return super.hasNext();
+    }
+
+    @Override
+    public boolean hasNext(long minBound, long maxBound) {
+      while (super.hasNext() && currentValue() == null) {
+        if (currentTime() < minBound || currentTime() >= maxBound) {
+          break;
+        }
+        super.next();
+      }
+      return super.hasNext();
+    }
+
+    @Override
+    public Object currentValue() {
+      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+      return v == null ? null : v.getValue();
+    }
+
+    @Override
+    public int totalLength() {
+      // aligned timeseries' BatchData length() may return the length of time column
+      // we need traverse to VectorBatchDataIterator calculate the actual value column's length
+      int cnt = 0;
+      int indexSave = index;
+      while (hasNext()) {
+        cnt++;
+        next();
+      }
+      index = indexSave;
+      return cnt;
+    }
+  }
+
+  private long updateRetainedSize() {
+    long retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueColumns);
+    retainedSizeInBytes += timeColumn.getRetainedSizeInBytes();
+    for (Column column : valueColumns) {
+      retainedSizeInBytes += column.getRetainedSizeInBytes();
+    }
+    this.retainedSizeInBytes = retainedSizeInBytes;
+    return retainedSizeInBytes;
+  }
+
+  private static int determinePositionCount(Column... columns) {
+    requireNonNull(columns, "columns is null");
+    if (columns.length == 0) {
+      throw new IllegalArgumentException("columns is empty");
+    }
+
+    return columns[0].getPositionCount();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
new file mode 100644
index 0000000..9e615cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.*;
+
+import java.util.List;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class TsBlockBuilder {
+
+  // We choose default initial size to be 8 for TsBlockBuilder and ColumnBuilder
+  // so the underlying data is larger than the object overhead, and the size is power of 2.
+  //
+  // This could be any other small number.
+  private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
+
+  private TimeColumnBuilder timeColumnBuilder;
+  private ColumnBuilder[] valueColumnBuilders;
+  private List<TSDataType> types;
+  private TsBlockBuilderStatus tsBlockBuilderStatus;
+  private int declaredPositions;
+
+  private TsBlockBuilder() {}
+
+  /**
+   * Create a TsBlockBuilder with given types.
+   *
+   * <p>A TsBlockBuilder instance created with this constructor has no estimation about bytes per
+   * entry, therefore it can resize frequently while appending new rows.
+   *
+   * <p>This constructor should only be used to get the initial TsBlockBuilder. Once the
+   * TsBlockBuilder is full use reset() or createTsBlockBuilderLike() to create a new TsBlockBuilder
+   * instance with its size estimated based on previous data.
+   */
+  public TsBlockBuilder(List<TSDataType> types) {
+    this(DEFAULT_INITIAL_EXPECTED_ENTRIES, types);
+  }
+
+  public TsBlockBuilder(int initialExpectedEntries, List<TSDataType> types) {
+    this(initialExpectedEntries, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, types);
+  }
+
+  public static TsBlockBuilder createWithOnlyTimeColumn() {
+    TsBlockBuilder res = new TsBlockBuilder();
+    res.tsBlockBuilderStatus = new TsBlockBuilderStatus(DEFAULT_INITIAL_EXPECTED_ENTRIES);
+    res.timeColumnBuilder =
+        new TimeColumnBuilder(
+            res.tsBlockBuilderStatus.createColumnBuilderStatus(), DEFAULT_INITIAL_EXPECTED_ENTRIES);
+    return res;
+  }
+
+  public static TsBlockBuilder withMaxTsBlockSize(int maxTsBlockBytes, List<TSDataType> types) {
+    return new TsBlockBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxTsBlockBytes, types);
+  }
+
+  private TsBlockBuilder(int initialExpectedEntries, int maxTsBlockBytes, List<TSDataType> types) {
+    this.types = requireNonNull(types, "types is null");
+
+    tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+    timeColumnBuilder =
+        new TimeColumnBuilder(
+            tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+      // instead of switch-case
+      switch (types.get(i)) {
+        case BOOLEAN:
+          valueColumnBuilders[i] =
+              new BooleanColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT32:
+          valueColumnBuilders[i] =
+              new IntColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT64:
+          valueColumnBuilders[i] =
+              new LongColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case FLOAT:
+          valueColumnBuilders[i] =
+              new FloatColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case DOUBLE:
+          valueColumnBuilders[i] =
+              new DoubleColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case TEXT:
+          valueColumnBuilders[i] =
+              new BinaryColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+      }
+    }
+  }
+
+  private TsBlockBuilder(
+      int maxTsBlockBytes,
+      List<TSDataType> types,
+      TimeColumnBuilder templateTimeColumnBuilder,
+      ColumnBuilder[] templateValueColumnBuilders) {
+    this.types = requireNonNull(types, "types is null");
+
+    tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+
+    checkArgument(
+        templateValueColumnBuilders.length == types.size(),
+        "Size of templates and types should match");
+    timeColumnBuilder =
+        (TimeColumnBuilder)
+            templateTimeColumnBuilder.newColumnBuilderLike(
+                tsBlockBuilderStatus.createColumnBuilderStatus());
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      valueColumnBuilders[i] =
+          templateValueColumnBuilders[i].newColumnBuilderLike(
+              tsBlockBuilderStatus.createColumnBuilderStatus());
+    }
+  }
+
+  public void buildValueColumnBuilders(List<TSDataType> types) {
+    this.types = requireNonNull(types, "types is null");
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+    int initialExpectedEntries = timeColumnBuilder.getPositionCount();
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+      // instead of switch-case
+      switch (types.get(i)) {
+        case BOOLEAN:
+          valueColumnBuilders[i] =
+              new BooleanColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT32:
+          valueColumnBuilders[i] =
+              new IntColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT64:
+          valueColumnBuilders[i] =
+              new LongColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case FLOAT:
+          valueColumnBuilders[i] =
+              new FloatColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case DOUBLE:
+          valueColumnBuilders[i] =
+              new DoubleColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case TEXT:
+          valueColumnBuilders[i] =
+              new BinaryColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+      }
+    }
+  }
+
+  public void reset() {
+    if (isEmpty()) {
+      return;
+    }
+    tsBlockBuilderStatus =
+        new TsBlockBuilderStatus(tsBlockBuilderStatus.getMaxTsBlockSizeInBytes());
+
+    declaredPositions = 0;
+
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      valueColumnBuilders[i] =
+          valueColumnBuilders[i].newColumnBuilderLike(
+              tsBlockBuilderStatus.createColumnBuilderStatus());
+    }
+  }
+
+  public TsBlockBuilder newTsBlockBuilderLike() {
+    return new TsBlockBuilder(
+        tsBlockBuilderStatus.getMaxTsBlockSizeInBytes(),
+        types,
+        timeColumnBuilder,
+        valueColumnBuilders);
+  }
+
+  public TimeColumnBuilder getTimeColumnBuilder() {
+    return timeColumnBuilder;
+  }
+
+  public ColumnBuilder getColumnBuilder(int channel) {
+    return valueColumnBuilders[channel];
+  }
+
+  public TSDataType getType(int channel) {
+    return types.get(channel);
+  }
+
+  public void declarePosition() {
+    declaredPositions++;
+  }
+
+  public void declarePositions(int positions) {
+    declaredPositions += positions;
+  }
+
+  public boolean isFull() {
+    return declaredPositions == Integer.MAX_VALUE || tsBlockBuilderStatus.isFull();
+  }
+
+  public boolean isEmpty() {
+    return declaredPositions == 0;
+  }
+
+  public int getPositionCount() {
+    return declaredPositions;
+  }
+
+  public long getSizeInBytes() {
+    return tsBlockBuilderStatus.getSizeInBytes();
+  }
+
+  public long getRetainedSizeInBytes() {
+    // We use a foreach loop instead of streams
+    // as it has much better performance.
+    long retainedSizeInBytes = timeColumnBuilder.getRetainedSizeInBytes();
+    for (ColumnBuilder columnBuilder : valueColumnBuilders) {
+      retainedSizeInBytes += columnBuilder.getRetainedSizeInBytes();
+    }
+    return retainedSizeInBytes;
+  }
+
+  public TsBlock build() {
+    if (valueColumnBuilders.length == 0) {
+      return new TsBlock(declaredPositions);
+    }
+    TimeColumn timeColumn = (TimeColumn) timeColumnBuilder.build();
+    if (timeColumn.getPositionCount() != declaredPositions) {
+      throw new IllegalStateException(
+          format(
+              "Declared positions (%s) does not match time column's number of entries (%s)",
+              declaredPositions, timeColumn.getPositionCount()));
+    }
+
+    Column[] columns = new Column[valueColumnBuilders.length];
+    for (int i = 0; i < columns.length; i++) {
+      columns[i] = valueColumnBuilders[i].build();
+      if (columns[i].getPositionCount() != declaredPositions) {
+        throw new IllegalStateException(
+            format(
+                "Declared positions (%s) does not match column %s's number of entries (%s)",
+                declaredPositions, i, columns[i].getPositionCount()));
+      }
+    }
+
+    return TsBlock.wrapBlocksWithoutCopy(declaredPositions, timeColumn, columns);
+  }
+
+  private static void checkArgument(boolean expression, String errorMessage) {
+    if (!expression) {
+      throw new IllegalArgumentException(errorMessage);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
new file mode 100644
index 0000000..a48e9cd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilderStatus;
+
+public class TsBlockBuilderStatus {
+
+  public static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = 1024 * 1024;
+
+  private final int maxTsBlockSizeInBytes;
+
+  private long currentSize;
+
+  public TsBlockBuilderStatus() {
+    this(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+  }
+
+  public TsBlockBuilderStatus(int maxTsBlockSizeInBytes) {
+    this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+  }
+
+  public ColumnBuilderStatus createColumnBuilderStatus() {
+    return new ColumnBuilderStatus(this);
+  }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return maxTsBlockSizeInBytes;
+  }
+
+  public boolean isEmpty() {
+    return currentSize == 0;
+  }
+
+  public boolean isFull() {
+    return currentSize >= maxTsBlockSizeInBytes;
+  }
+
+  public void addBytes(int bytes) {
+    if (bytes < 0) {
+      throw new IllegalArgumentException("bytes cannot be negative");
+    }
+    currentSize += bytes;
+  }
+
+  public long getSizeInBytes() {
+    return currentSize;
+  }
+
+  @Override
+  public String toString() {
+    return "TsBlockBuilderStatus{"
+        + "maxTsBlockSizeInBytes="
+        + maxTsBlockSizeInBytes
+        + ", currentSize="
+        + currentSize
+        + '}';
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
similarity index 97%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
index 7d3a5dd..0f16f1e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
+package org.apache.iotdb.tsfile.read.common.block;
 
 import java.util.List;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
new file mode 100644
index 0000000..cfb22c9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BinaryColumn.class).instanceSize();
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final Binary[] values;
+
+  private final long retainedSizeInBytes;
+
+  public BinaryColumn(int positionCount, Optional<boolean[]> valueIsNull, Binary[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  BinaryColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, Binary[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    // TODO we need to sum up all the Binary's retainedSize here
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public Binary getBinary(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsBinary(getBinary(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
new file mode 100644
index 0000000..c5c629f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BinaryColumnBuilder.class).instanceSize();
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private static final BinaryColumn NULL_VALUE_BLOCK =
+      new BinaryColumn(0, 1, new boolean[] {true}, new Binary[1]);
+
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private Binary[] values = new Binary[0];
+
+  private long arraysRetainedSizeInBytes;
+
+  public BinaryColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.initialEntryCount = max(expectedEntries, 1);
+    this.columnBuilderStatus = columnBuilderStatus;
+    updateArraysDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeBinary(Binary value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeBinary(value.getBinary());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    BinaryColumn column = (BinaryColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeBinary(column.getBinary(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new BinaryColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    // TODO we need to sum up all the Binary's retainedSize here
+    long size = INSTANCE_SIZE + arraysRetainedSizeInBytes;
+    if (columnBuilderStatus != null) {
+      size += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+    return size;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    // TODO we should take retain size into account here
+    return new BinaryColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateArraysDataSize();
+  }
+
+  private void updateArraysDataSize() {
+    arraysRetainedSizeInBytes = sizeOf(valueIsNull) + sizeOf(values);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
new file mode 100644
index 0000000..a2b8550
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BooleanColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Byte.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final boolean[] values;
+
+  private final long retainedSizeInBytes;
+
+  public BooleanColumn(int positionCount, Optional<boolean[]> valueIsNull, boolean[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  BooleanColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, boolean[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public boolean getBoolean(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsBoolean(getBoolean(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
new file mode 100644
index 0000000..5e7629d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BooleanColumnBuilder.class).instanceSize();
+  private static final BooleanColumn NULL_VALUE_BLOCK =
+      new BooleanColumn(0, 1, new boolean[] {true}, new boolean[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private boolean[] values = new boolean[0];
+
+  private long retainedSizeInBytes;
+
+  public BooleanColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeBoolean(boolean value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeBoolean(value.getBoolean());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    BooleanColumn column = (BooleanColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeBoolean(column.getBoolean(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new BooleanColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new BooleanColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
new file mode 100644
index 0000000..adc06a1
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface Column {
+
+  /** Gets a boolean at {@code position}. */
+  default boolean getBoolean(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian int at {@code position}. */
+  default int getInt(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian long at {@code position}. */
+  default long getLong(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a float at {@code position}. */
+  default float getFloat(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a double at {@code position}. */
+  default double getDouble(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a Binary at {@code position}. */
+  default Binary getBinary(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a TsPrimitiveType at {@code position}. */
+  default TsPrimitiveType getTsPrimitiveType(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * Is the specified position null?
+   *
+   * @throws IllegalArgumentException if this position is not valid. The method may return false
+   *     without throwing exception when there are no nulls in the block, even if the position is
+   *     invalid
+   */
+  boolean isNull(int position);
+
+  /** Returns the number of positions in this block. */
+  int getPositionCount();
+
+  /**
+   * Returns the retained size of this column in memory, including over-allocations. This method is
+   * called from the inner most execution loop and must be fast.
+   */
+  long getRetainedSizeInBytes();
+
+  /**
+   * Returns a column starting at the specified position and extends for the specified length. The
+   * specified region must be entirely contained within this column.
+   *
+   * <p>The region can be a view over this column. If this column is released, the region column may
+   * also be released. If the region column is released, this block may also be released.
+   */
+  Column getRegion(int positionOffset, int length);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
new file mode 100644
index 0000000..a16e5fa
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface ColumnBuilder {
+
+  /** Write a boolean to the current entry; */
+  default ColumnBuilder writeBoolean(boolean value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a short to the current entry; */
+  default ColumnBuilder writeInt(int value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a int to the current entry; */
+  default ColumnBuilder writeLong(long value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a long to the current entry; */
+  default ColumnBuilder writeFloat(float value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a byte sequences to the current entry; */
+  default ColumnBuilder writeDouble(double value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a Binary to the current entry; */
+  default ColumnBuilder writeBinary(Binary value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a TsPrimitiveType sequences to the current entry; */
+  default ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder);
+
+  /** Appends a null value to the block. */
+  ColumnBuilder appendNull();
+
+  /** Builds the block. This method can be called multiple times. */
+  Column build();
+
+  /**
+   * Returns the retained size of this column in memory, including over-allocations. This method is
+   * called from the inner most execution loop and must be fast.
+   */
+  long getRetainedSizeInBytes();
+
+  /**
+   * Creates a new column builder of the same type based on the current usage statistics of this
+   * column builder.
+   */
+  ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
new file mode 100644
index 0000000..d0f62a2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnBuilderStatus {
+
+  public static final int INSTANCE_SIZE = deepInstanceSize(ColumnBuilderStatus.class);
+
+  private final TsBlockBuilderStatus tsBlockBuilderStatus;
+
+  private int currentSize;
+
+  public ColumnBuilderStatus(TsBlockBuilderStatus tsBlockBuilderStatus) {
+    this.tsBlockBuilderStatus =
+        requireNonNull(tsBlockBuilderStatus, "tsBlockBuilderStatus must not be null");
+  }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return tsBlockBuilderStatus.getMaxTsBlockSizeInBytes();
+  }
+
+  public void addBytes(int bytes) {
+    currentSize += bytes;
+    tsBlockBuilderStatus.addBytes(bytes);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnBuilderStatus{" + ", currentSize=" + currentSize + '}';
+  }
+
+  /**
+   * Computes the size of an instance of this class assuming that all reference fields are non-null
+   */
+  private static int deepInstanceSize(Class<?> clazz) {
+    if (clazz.isArray()) {
+      throw new IllegalArgumentException(
+          format(
+              "Cannot determine size of %s because it contains an array", clazz.getSimpleName()));
+    }
+    if (clazz.isInterface()) {
+      throw new IllegalArgumentException(format("%s is an interface", clazz.getSimpleName()));
+    }
+    if (Modifier.isAbstract(clazz.getModifiers())) {
+      throw new IllegalArgumentException(format("%s is abstract", clazz.getSimpleName()));
+    }
+    if (!clazz.getSuperclass().equals(Object.class)) {
+      throw new IllegalArgumentException(
+          format(
+              "Cannot determine size of a subclass. %s extends from %s",
+              clazz.getSimpleName(), clazz.getSuperclass().getSimpleName()));
+    }
+
+    int size = ClassLayout.parseClass(clazz).instanceSize();
+    for (Field field : clazz.getDeclaredFields()) {
+      if (!field.getType().isPrimitive()) {
+        size += deepInstanceSize(field.getType());
+      }
+    }
+    return size;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
new file mode 100644
index 0000000..150f713
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import static java.lang.Math.ceil;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnUtil {
+
+  private static final double BLOCK_RESET_SKEW = 1.25;
+
+  private static final int DEFAULT_CAPACITY = 64;
+  // See java.util.ArrayList for an explanation
+  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+  private ColumnUtil() {}
+
+  static void checkArrayRange(int[] array, int offset, int length) {
+    requireNonNull(array, "array is null");
+    if (offset < 0 || length < 0 || offset + length > array.length) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid offset %s and length %s in array with %s elements",
+              offset, length, array.length));
+    }
+  }
+
+  static void checkValidRegion(int positionCount, int positionOffset, int length) {
+    if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid position %s and length %s in block with %s positions",
+              positionOffset, length, positionCount));
+    }
+  }
+
+  static void checkValidPositions(boolean[] positions, int positionCount) {
+    if (positions.length != positionCount) {
+      throw new IllegalArgumentException(
+          format(
+              "Invalid positions array size %d, actual position count is %d",
+              positions.length, positionCount));
+    }
+  }
+
+  static void checkValidPosition(int position, int positionCount) {
+    if (position < 0 || position >= positionCount) {
+      throw new IllegalArgumentException(
+          format("Invalid position %s in block with %s positions", position, positionCount));
+    }
+  }
+
+  static int calculateNewArraySize(int currentSize) {
+    // grow array by 50%
+    long newSize = (long) currentSize + (currentSize >> 1);
+
+    // verify new size is within reasonable bounds
+    if (newSize < DEFAULT_CAPACITY) {
+      newSize = DEFAULT_CAPACITY;
+    } else if (newSize > MAX_ARRAY_SIZE) {
+      newSize = MAX_ARRAY_SIZE;
+      if (newSize == currentSize) {
+        throw new IllegalArgumentException(format("Cannot grow array beyond '%s'", MAX_ARRAY_SIZE));
+      }
+    }
+    return (int) newSize;
+  }
+
+  static int calculateBlockResetSize(int currentSize) {
+    long newSize = (long) ceil(currentSize * BLOCK_RESET_SKEW);
+
+    // verify new size is within reasonable bounds
+    if (newSize < DEFAULT_CAPACITY) {
+      newSize = DEFAULT_CAPACITY;
+    } else if (newSize > MAX_ARRAY_SIZE) {
+      newSize = MAX_ARRAY_SIZE;
+    }
+    return (int) newSize;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
new file mode 100644
index 0000000..d16e0c0
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(DoubleColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Double.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final double[] values;
+
+  private final long retainedSizeInBytes;
+
+  public DoubleColumn(int positionCount, Optional<boolean[]> valueIsNull, double[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  DoubleColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, double[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public double getDouble(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsDouble(getDouble(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
new file mode 100644
index 0000000..a01f6f2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(DoubleColumnBuilder.class).instanceSize();
+  private static final DoubleColumn NULL_VALUE_BLOCK =
+      new DoubleColumn(0, 1, new boolean[] {true}, new double[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private double[] values = new double[0];
+
+  private long retainedSizeInBytes;
+
+  public DoubleColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeDouble(double value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeDouble(value.getDouble());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    DoubleColumn column = (DoubleColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeDouble(column.getDouble(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new DoubleColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new DoubleColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
new file mode 100644
index 0000000..6543bde
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(FloatColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Float.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final float[] values;
+
+  private final long retainedSizeInBytes;
+
+  public FloatColumn(int positionCount, Optional<boolean[]> valueIsNull, float[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  FloatColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, float[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public float getFloat(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsFloat(getFloat(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
new file mode 100644
index 0000000..ff89ea2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(FloatColumnBuilder.class).instanceSize();
+  private static final FloatColumn NULL_VALUE_BLOCK =
+      new FloatColumn(0, 1, new boolean[] {true}, new float[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private float[] values = new float[0];
+
+  private long retainedSizeInBytes;
+
+  public FloatColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeFloat(float value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeFloat(value.getFloat());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    FloatColumn column = (FloatColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeFloat(column.getFloat(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new FloatColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new FloatColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
new file mode 100644
index 0000000..4ad843f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(IntColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final int[] values;
+
+  private final long retainedSizeInBytes;
+
+  public IntColumn(int positionCount, Optional<boolean[]> valueIsNull, int[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  IntColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, int[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public int getInt(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsInt(getInt(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
new file mode 100644
index 0000000..9ab3f7f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(IntColumnBuilder.class).instanceSize();
+  private static final IntColumn NULL_VALUE_BLOCK =
+      new IntColumn(0, 1, new boolean[] {true}, new int[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private int[] values = new int[0];
+
+  private long retainedSizeInBytes;
+
+  public IntColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeInt(int value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeInt(value.getInt());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    IntColumn column = (IntColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeInt(column.getInt(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new IntColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new IntColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
new file mode 100644
index 0000000..9ab9edd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final long[] values;
+
+  private final long retainedSizeInBytes;
+
+  public LongColumn(int positionCount, Optional<boolean[]> valueIsNull, long[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  LongColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, long[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsLong(getLong(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
new file mode 100644
index 0000000..9053913
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(LongColumnBuilder.class).instanceSize();
+  private static final LongColumn NULL_VALUE_BLOCK =
+      new LongColumn(0, 1, new boolean[] {true}, new long[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private long[] values = new long[0];
+
+  private long retainedSizeInBytes;
+
+  public LongColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeLong(long value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeLong(value.getLong());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    LongColumn column = (LongColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeLong(column.getLong(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new LongColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new LongColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
new file mode 100644
index 0000000..3977500
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+
+public class RunLengthEncodedColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(RunLengthEncodedColumn.class).instanceSize();
+
+  private final Column value;
+  private final int positionCount;
+
+  public RunLengthEncodedColumn(Column value, int positionCount) {
+    requireNonNull(value, "value is null");
+    if (value.getPositionCount() != 1) {
+      throw new IllegalArgumentException(
+          format(
+              "Expected value to contain a single position but has %s positions",
+              value.getPositionCount()));
+    }
+
+    if (value instanceof RunLengthEncodedColumn) {
+      this.value = ((RunLengthEncodedColumn) value).getValue();
+    } else {
+      this.value = value;
+    }
+
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+
+    this.positionCount = positionCount;
+  }
+
+  public Column getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean getBoolean(int position) {
+    checkReadablePosition(position);
+    return value.getBoolean(position);
+  }
+
+  @Override
+  public int getInt(int position) {
+    checkReadablePosition(position);
+    return value.getInt(position);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return value.getLong(position);
+  }
+
+  @Override
+  public float getFloat(int position) {
+    checkReadablePosition(position);
+    return value.getFloat(position);
+  }
+
+  @Override
+  public double getDouble(int position) {
+    checkReadablePosition(position);
+    return value.getDouble(position);
+  }
+
+  @Override
+  public Binary getBinary(int position) {
+    checkReadablePosition(position);
+    return value.getBinary(position);
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return value.getTsPrimitiveType(position);
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return value.isNull(0);
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return INSTANCE_SIZE + value.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(positionCount, positionOffset, length);
+    return new RunLengthEncodedColumn(value, length);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= positionCount) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
new file mode 100644
index 0000000..c8d4748
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final long[] values;
+
+  private final long retainedSizeInBytes;
+
+  public TimeColumn(int positionCount, long[] values) {
+    this(0, positionCount, values);
+  }
+
+  TimeColumn(int arrayOffset, int positionCount, long[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    return false;
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new TimeColumn(positionOffset + arrayOffset, length, values);
+  }
+
+  public long getEndTime() {
+    return values[getPositionCount() + arrayOffset - 1];
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
new file mode 100644
index 0000000..9b455cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(TimeColumnBuilder.class).instanceSize();
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+
+  private long[] values = new long[0];
+
+  private long retainedSizeInBytes;
+
+  public TimeColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeLong(long value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(TimeColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  public Column build() {
+    return new TimeColumn(0, positionCount, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new TimeColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public long getTime(int position) {
+    checkReadablePosition(position);
+    return values[position];
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index db76550..3affee8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.tsfile.read.reader;
 
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 2efe6802..dd75b21 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -25,7 +25,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
@@ -105,8 +107,38 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
-    // TODO need to implement for mpp
-    throw new IllegalStateException("We have not implemented this method.");
+    // TODO change from the row-based style to column-based style
+    TsBlockBuilder builder =
+        new TsBlockBuilder(
+            valuePageReaderList.stream()
+                .map(ValuePageReader::getDataType)
+                .collect(Collectors.toList()));
+    int timeIndex = -1;
+    while (timePageReader.hasNextTime()) {
+      long timestamp = timePageReader.nextTime();
+      timeIndex++;
+      // if all the sub sensors' value are null in current row, just discard it
+      boolean isNull = true;
+      Object notNullObject = null;
+      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
+      for (int i = 0; i < v.length; i++) {
+        ValuePageReader pageReader = valuePageReaderList.get(i);
+        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
+        if (v[i] != null) {
+          isNull = false;
+          notNullObject = v[i].getValue();
+        }
+      }
+      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
+      // sensor
+      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
+        builder.getTimeColumnBuilder().writeLong(timestamp);
+        for (int i = 0; i < v.length; i++) {
+          builder.getColumnBuilder(i).writeTsPrimitiveType(v[i]);
+        }
+      }
+    }
+    return builder.build();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index c1a4c59..ec5ff85 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -26,7 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
@@ -35,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 public class PageReader implements IPageReader {
@@ -157,53 +161,76 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
     // TODO we still need to consider data type, ascending and descending here
-    TsBlock tsBlock = new TsBlock();
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {
-      while (timeDecoder.hasNext(timeBuffer)) {
-        long timestamp = timeDecoder.readLong(timeBuffer);
-        switch (dataType) {
-          case BOOLEAN:
+      switch (dataType) {
+        case BOOLEAN:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
-              //              tsBlock.putBoolean(timestamp, aBoolean);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeBoolean(aBoolean);
             }
-            break;
-          case INT32:
+          }
+          break;
+        case INT32:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             int anInt = valueDecoder.readInt(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
-              //              tsBlock.putInt(timestamp, anInt);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeInt(anInt);
             }
-            break;
-          case INT64:
+          }
+          break;
+        case INT64:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             long aLong = valueDecoder.readLong(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
-              //              tsBlock.putLong(timestamp, aLong);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeLong(aLong);
             }
-            break;
-          case FLOAT:
+          }
+          break;
+        case FLOAT:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             float aFloat = valueDecoder.readFloat(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
-              //              tsBlock.putFloat(timestamp, aFloat);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeFloat(aFloat);
             }
-            break;
-          case DOUBLE:
+          }
+          break;
+        case DOUBLE:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             double aDouble = valueDecoder.readDouble(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
-              //              tsBlock.putDouble(timestamp, aDouble);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeDouble(aDouble);
             }
-            break;
-          case TEXT:
+          }
+          break;
+        case TEXT:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             Binary aBinary = valueDecoder.readBinary(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
-              //              tsBlock.putBinary(timestamp, aBinary);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeBinary(aBinary);
             }
-            break;
-          default:
-            throw new UnSupportedDataTypeException(String.valueOf(dataType));
-        }
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return tsBlock;
+    return tsBlockBuilder.build();
   }
 
   @Override