You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/28 06:33:35 UTC

[iotdb] 01/01: TsBlock pre implementation

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e61858d38da8e6753a5f1ba314c3aaab806a4f18
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Mar 28 11:35:54 2022 +0800

    TsBlock pre implementation
---
 pom.xml                                            |   5 +
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |   2 +-
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  |   2 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   2 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   2 +-
 .../db/mpp/operator/process/AggregateOperator.java |   2 +-
 .../mpp/operator/process/DeviceMergeOperator.java  |   2 +-
 .../db/mpp/operator/process/FillOperator.java      |   2 +-
 .../mpp/operator/process/FilterNullOperator.java   |   2 +-
 .../mpp/operator/process/GroupByLevelOperator.java |   2 +-
 .../db/mpp/operator/process/LimitOperator.java     |   6 +-
 .../db/mpp/operator/process/OffsetOperator.java    |   2 +-
 .../db/mpp/operator/process/SortOperator.java      |   2 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  36 ++-
 .../db/mpp/operator/sink/FragmentSinkOperator.java |   2 +-
 .../iotdb/db/mpp/operator/sink/SinkOperator.java   |   2 +-
 .../mpp/operator/source/AlignedSeriesScanUtil.java |  88 ++++++
 .../source/SeriesAggregateScanOperator.java        |   2 +-
 .../db/mpp/operator/source/SeriesScanOperator.java |   4 +-
 .../db/mpp/operator/source/SeriesScanUtil.java     |  56 ++--
 .../query/reader/chunk/MemAlignedPageReader.java   |  38 ++-
 .../iotdb/db/query/reader/chunk/MemPageReader.java | 113 +++++---
 tsfile/pom.xml                                     |   4 +
 .../apache/iotdb/tsfile/read/common/Column.java    |  21 --
 .../iotdb/tsfile/read/common/TimeColumn.java       |  21 --
 .../apache/iotdb/tsfile/read/common/TsBlock.java   | 175 ------------
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 317 +++++++++++++++++++++
 .../tsfile/read/common/block/TsBlockBuilder.java   | 295 +++++++++++++++++++
 .../read/common/block/TsBlockBuilderStatus.java    |  75 +++++
 .../read/common/{ => block}/TsBlockMetadata.java   |   2 +-
 .../read/common/block/column/BinaryColumn.java     | 110 +++++++
 .../common/block/column/BinaryColumnBuilder.java   | 148 ++++++++++
 .../read/common/block/column/BooleanColumn.java    | 109 +++++++
 .../common/block/column/BooleanColumnBuilder.java  | 150 ++++++++++
 .../tsfile/read/common/block/column/Column.java    |  87 ++++++
 .../read/common/block/column/ColumnBuilder.java    |  81 ++++++
 .../common/block/column/ColumnBuilderStatus.java   |  88 ++++++
 .../read/common/block/column/ColumnUtil.java       |  97 +++++++
 .../read/common/block/column/DoubleColumn.java     | 109 +++++++
 .../common/block/column/DoubleColumnBuilder.java   | 150 ++++++++++
 .../read/common/block/column/FloatColumn.java      | 108 +++++++
 .../common/block/column/FloatColumnBuilder.java    | 150 ++++++++++
 .../tsfile/read/common/block/column/IntColumn.java | 108 +++++++
 .../read/common/block/column/IntColumnBuilder.java | 150 ++++++++++
 .../read/common/block/column/LongColumn.java       | 108 +++++++
 .../common/block/column/LongColumnBuilder.java     | 150 ++++++++++
 .../block/column/RunLengthEncodedColumn.java       | 133 +++++++++
 .../read/common/block/column/TimeColumn.java       |  95 ++++++
 .../common/block/column/TimeColumnBuilder.java     | 126 ++++++++
 .../iotdb/tsfile/read/reader/IPageReader.java      |   2 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |  38 ++-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |  81 ++++--
 52 files changed, 3323 insertions(+), 339 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3f1c875..f19c018 100644
--- a/pom.xml
+++ b/pom.xml
@@ -506,6 +506,11 @@
                 <version>${airline.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.openjdk.jol</groupId>
+                <artifactId>jol-core</artifactId>
+                <version>0.2</version>
+            </dependency>
+            <dependency>
                 <groupId>org.fusesource.mqtt-client</groupId>
                 <artifactId>mqtt-client</artifactId>
                 <version>1.12</version>
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 0d64cbc..6b4e07c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index b493759..ef2919c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 0216556..5f8607e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index d545c4f..df89218 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index c439ad4..a08fb68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index b9fce3a..ebfb54e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index e2732cf..52a3fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index d8a33ba..0083ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 6e19233..285427a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 47c81bc..25beb6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -56,8 +56,8 @@ public class LimitOperator implements ProcessOperator {
   public TsBlock next() throws IOException {
     TsBlock block = child.next();
     TsBlock res = block;
-    if (block.getCount() <= remainingLimit) {
-      remainingLimit -= block.getCount();
+    if (block.getPositionCount() <= remainingLimit) {
+      remainingLimit -= block.getPositionCount();
     } else {
       res = block.getRegion(0, (int) remainingLimit);
       remainingLimit = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index e9eef20..a22985c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index f0ac49b..5e2f0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index c5665ee..60fa4f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -22,8 +22,13 @@ import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.read.common.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -48,11 +53,14 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private final int columnCount;
 
+  private final List<TSDataType> dataTypes;
+
   public TimeJoinOperator(
       OperatorContext operatorContext,
       List<Operator> children,
       OrderBy mergeOrder,
-      int columnCount) {
+      int columnCount,
+      List<TSDataType> dataTypes) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.inputCount = children.size();
@@ -61,6 +69,7 @@ public class TimeJoinOperator implements ProcessOperator {
     this.noMoreTsBlocks = new boolean[this.inputCount];
     this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
     this.columnCount = columnCount;
+    this.dataTypes = dataTypes;
   }
 
   @Override
@@ -92,7 +101,7 @@ public class TimeJoinOperator implements ProcessOperator {
         inputIndex[i] = 0;
         inputTsBlocks[i] = children.get(i).next();
         if (!empty(i)) {
-          int rowSize = inputTsBlocks[i].getCount();
+          int rowSize = inputTsBlocks[i].getPositionCount();
           for (int row = 0; row < rowSize; row++) {
             timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
           }
@@ -113,22 +122,29 @@ public class TimeJoinOperator implements ProcessOperator {
       return null;
     }
 
-    TsBlock res = new TsBlock(columnCount);
+    TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) {
-      res.addTime(timeSelector.pollFirst());
+      timeBuilder.writeLong(timeSelector.pollFirst());
     }
 
+    tsBlockBuilder.buildValueColumnBuilders(dataTypes);
+
     for (int i = 0, column = 0; i < inputCount; i++) {
       TsBlock block = inputTsBlocks[i];
       TimeColumn timeColumn = block.getTimeColumn();
       int valueColumnCount = block.getValueColumnCount();
       int startIndex = inputIndex[i];
       for (int j = 0; j < valueColumnCount; j++) {
-        inputIndex[i] =
-            res.addValues(column++, timeColumn, block.getColumn(j), startIndex, currentEndTime);
+        startIndex = inputIndex[i];
+        ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(column++);
+        Column valueColumn = block.getColumn(j);
+        startIndex = columnBuilder.appendColumn(timeColumn, valueColumn, startIndex, timeBuilder);
       }
+      inputIndex[i] = startIndex;
     }
-    return res;
+    return tsBlockBuilder.build();
   }
 
   @Override
@@ -156,6 +172,6 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private boolean empty(int columnIndex) {
     return inputTsBlocks[columnIndex] == null
-        || inputTsBlocks[columnIndex].getCount() == inputIndex[columnIndex];
+        || inputTsBlocks[columnIndex].getPositionCount() == inputIndex[columnIndex];
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index 7be5cfa..7bbccbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.sink;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index e5a3e15..b691db0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.sink;
 
 import org.apache.iotdb.db.mpp.operator.Operator;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 public interface SinkOperator extends Operator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
new file mode 100644
index 0000000..9fd48c7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.source;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AlignedSeriesScanUtil extends SeriesScanUtil {
+
+  private final List<TSDataType> dataTypes;
+
+  public AlignedSeriesScanUtil(
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      FragmentInstanceContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    super(
+        seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter, ascending);
+    dataTypes =
+        ((AlignedPath) seriesPath)
+            .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+  }
+
+  @Override
+  protected PriorityMergeReader getPriorityMergeReader() {
+    return new AlignedPriorityMergeReader();
+  }
+
+  @Override
+  protected DescPriorityMergeReader getDescPriorityMergeReader() {
+    return new AlignedDescPriorityMergeReader();
+  }
+
+  @Override
+  protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+      TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(
+        resource, (AlignedPath) seriesPath, context, filter);
+  }
+
+  @Override
+  protected List<TSDataType> getTsDataTypeList() {
+    return dataTypes;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 0ef6b09..87151ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 19aa191..e84e649 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -127,6 +127,6 @@ public class SeriesScanOperator implements Operator {
   }
 
   private boolean isEmpty(TsBlock tsBlock) {
-    return tsBlock == null || !tsBlock.hasNext();
+    return tsBlock == null || tsBlock.isEmpty();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 9e774dc..e3d2f9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -35,7 +35,9 @@ import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -389,7 +391,7 @@ public class SeriesScanUtil {
     } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
       if (hasNextOverlappedPage()) {
         cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
           hasCachedNextOverlappedPage = true;
           return true;
         }
@@ -440,7 +442,7 @@ public class SeriesScanUtil {
        */
       if (hasNextOverlappedPage()) {
         cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && cachedTsBlock.hasNext()) {
+        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
           hasCachedNextOverlappedPage = true;
           return true;
         }
@@ -644,7 +646,8 @@ public class SeriesScanUtil {
       if (mergeReader.hasNextTimeValuePair()) {
 
         // TODO we still need to consider data type, ascending and descending here
-        cachedTsBlock = new TsBlock();
+        TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
+        TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
         long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
         while (mergeReader.hasNextTimeValuePair()) {
 
@@ -662,7 +665,7 @@ public class SeriesScanUtil {
              * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
              * we could use the first sequence page reader later
              */
-            if (cachedTsBlock.hasNext() || firstPageReader != null || !seqPageReaders.isEmpty()) {
+            if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) {
               break;
             }
             // so, we don't have other data except mergeReader
@@ -689,7 +692,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < firstPageReader.getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+              hasCachedNextOverlappedPage = !builder.isEmpty();
+              cachedTsBlock = builder.build();
               return hasCachedNextOverlappedPage;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
@@ -716,7 +720,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < seqPageReaders.get(0).getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+              hasCachedNextOverlappedPage = !builder.isEmpty();
+              cachedTsBlock = builder.build();
               return hasCachedNextOverlappedPage;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
@@ -752,37 +757,40 @@ public class SeriesScanUtil {
 
           if (valueFilter == null
               || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+            builder.declarePosition();
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
             switch (dataType) {
               case BOOLEAN:
-                //            tsBlock.putBoolean(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getBoolean());
+                builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
                 break;
               case INT32:
-                //            tsBlock.putInt(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getInt());
+                builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
                 break;
               case INT64:
-                //            tsBlock.putLong(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getLong());
+                builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
                 break;
               case FLOAT:
-                //            tsBlock.putFloat(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getFloat());
+                builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
                 break;
               case DOUBLE:
-                //            tsBlock.putDouble(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getDouble());
+                builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
                 break;
               case TEXT:
-                //            tsBlock.putBinary(timeValuePair.getTimestamp(),
-                // timeValuePair.getValue().getBinary());
+                builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+                break;
+              case VECTOR:
+                TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                for (int i = 0; i < values.length; i++) {
+                  builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                }
                 break;
               default:
                 throw new UnSupportedDataTypeException(String.valueOf(dataType));
             }
           }
         }
-        hasCachedNextOverlappedPage = cachedTsBlock.hasNext();
+        hasCachedNextOverlappedPage = builder.isEmpty();
+        cachedTsBlock = builder.build();
         /*
          * if current overlapped page has valid data, return, otherwise read next overlapped page
          */
@@ -1006,7 +1014,7 @@ public class SeriesScanUtil {
     }
   }
 
-  protected void unpackSeqTsFileResource() throws IOException {
+  private void unpackSeqTsFileResource() throws IOException {
     ITimeSeriesMetadata timeseriesMetadata =
         loadTimeSeriesMetadata(
             orderUtils.getNextSeqFileResource(true),
@@ -1020,7 +1028,7 @@ public class SeriesScanUtil {
     }
   }
 
-  protected void unpackUnseqTsFileResource() throws IOException {
+  private void unpackUnseqTsFileResource() throws IOException {
     ITimeSeriesMetadata timeseriesMetadata =
         loadTimeSeriesMetadata(
             orderUtils.getNextUnseqFileResource(true),
@@ -1046,6 +1054,10 @@ public class SeriesScanUtil {
         resource, seriesPath, context, filter, allSensors);
   }
 
+  protected List<TSDataType> getTsDataTypeList() {
+    return Collections.singletonList(dataType);
+  }
+
   protected Filter getAnyFilter() {
     return timeFilter != null ? timeFilter : valueFilter;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index 7216ea7..e7ebc1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.query.reader.chunk;
 
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -33,6 +35,7 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
 
 public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
 
@@ -81,8 +84,37 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
-    // TODO need to implement for mpp
-    throw new IllegalStateException("We have not implemented this method.");
+    // TODO change from the row-based style to column-based style
+    TsBlockBuilder builder =
+        new TsBlockBuilder(
+            chunkMetadata.getValueChunkMetadataList().stream()
+                .map(IChunkMetadata::getDataType)
+                .collect(Collectors.toList()));
+    while (timeValuePairIterator.hasNextTimeValuePair()) {
+      TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+      TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+      // save the first not null value of each row
+      Object firstNotNullObject = null;
+      for (TsPrimitiveType value : values) {
+        if (value != null) {
+          firstNotNullObject = value.getValue();
+          break;
+        }
+      }
+      // if all the sub sensors' value are null in current time
+      // or current row is not satisfied with the filter, just discard it
+      // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will only
+      // accept AlignedPath with only one sub sensor
+      if (firstNotNullObject != null
+          && (valueFilter == null
+              || valueFilter.satisfy(timeValuePair.getTimestamp(), firstNotNullObject))) {
+        builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
+        for (int i = 0; i < values.length; i++) {
+          builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+        }
+      }
+    }
+    return builder.build();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 79997b5..16704a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -25,13 +25,17 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
+import java.util.Collections;
 
 public class MemPageReader implements IPageReader {
 
@@ -66,43 +70,80 @@ public class MemPageReader implements IPageReader {
     TSDataType dataType = chunkMetadata.getDataType();
     // TODO we still need to consider data type, ascending and descending here
 
-    TsBlock tsBlock = new TsBlock();
-    while (timeValuePairIterator.hasNextTimeValuePair()) {
-      TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
-      if (valueFilter == null
-          || valueFilter.satisfy(
-              timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
-        switch (dataType) {
-          case BOOLEAN:
-            //            tsBlock.putBoolean(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getBoolean());
-            break;
-          case INT32:
-            //            tsBlock.putInt(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getInt());
-            break;
-          case INT64:
-            //            tsBlock.putLong(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getLong());
-            break;
-          case FLOAT:
-            //            tsBlock.putFloat(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getFloat());
-            break;
-          case DOUBLE:
-            //            tsBlock.putDouble(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getDouble());
-            break;
-          case TEXT:
-            //            tsBlock.putBinary(timeValuePair.getTimestamp(),
-            // timeValuePair.getValue().getBinary());
-            break;
-          default:
-            throw new UnSupportedDataTypeException(String.valueOf(dataType));
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
+    switch (dataType) {
+      case BOOLEAN:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeBoolean(timeValuePair.getValue().getBoolean());
+          }
         }
-      }
+        break;
+      case INT32:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeInt(timeValuePair.getValue().getInt());
+          }
+        }
+        break;
+      case INT64:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeLong(timeValuePair.getValue().getLong());
+          }
+        }
+        break;
+      case FLOAT:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeFloat(timeValuePair.getValue().getFloat());
+          }
+        }
+        break;
+      case DOUBLE:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeDouble(timeValuePair.getValue().getDouble());
+          }
+        }
+        break;
+      case TEXT:
+        while (timeValuePairIterator.hasNextTimeValuePair()) {
+          TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+          if (valueFilter == null
+              || valueFilter.satisfy(
+                  timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+            timeBuilder.writeLong(timeValuePair.getTimestamp());
+            valueBuilder.writeBinary(timeValuePair.getValue().getBinary());
+          }
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(dataType));
     }
-    return tsBlock;
+    return tsBlockBuilder.build();
   }
 
   @Override
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index beb2149..147ed65 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -68,6 +68,10 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.openjdk.jol</groupId>
+            <artifactId>jol-core</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
deleted file mode 100644
index c4933e1..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Column.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public interface Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
deleted file mode 100644
index 19937f6..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-public class TimeColumn implements Column {}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
deleted file mode 100644
index a6bffbf..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlock.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.read.common;
-
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
-
-import static java.lang.String.format;
-
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
-
-  // Describe the column info
-  private TsBlockMetadata metadata;
-
-  private TimeColumn timeColumn;
-
-  private Column[] valueColumns;
-
-  private int count;
-
-  public TsBlock() {}
-
-  public TsBlock(int columnCount) {
-    timeColumn = new TimeColumn();
-    valueColumns = new Column[columnCount];
-  }
-
-  public boolean hasNext() {
-    return false;
-  }
-
-  public void next() {}
-
-  public TsBlockMetadata getMetadata() {
-    return metadata;
-  }
-
-  public int getCount() {
-    return count;
-  }
-
-  /** TODO need to be implemented after the data structure being defined */
-  public long getEndTime() {
-    return -1;
-  }
-
-  public boolean isEmpty() {
-    return count == 0;
-  }
-
-  /**
-   * TODO has not been implemented yet
-   *
-   * @param positionOffset start offset
-   * @param length slice length
-   * @return view of current TsBlock start from positionOffset to positionOffset + length
-   */
-  public TsBlock getRegion(int positionOffset, int length) {
-    if (positionOffset < 0 || length < 0 || positionOffset + length > count) {
-      throw new IndexOutOfBoundsException(
-          format(
-              "Invalid position %s and length %s in page with %s positions",
-              positionOffset, length, count));
-    }
-    return this;
-  }
-
-  /** TODO need to be implemented after the data structure being defined */
-  public long getTimeByIndex(int index) {
-    return -1;
-  }
-
-  public void addTime(long time) {}
-
-  public int getValueColumnCount() {
-    return valueColumns.length;
-  }
-
-  public TimeColumn getTimeColumn() {
-    return timeColumn;
-  }
-
-  public Column getColumn(int columnIndex) {
-    return valueColumns[columnIndex];
-  }
-
-  public int addValues(
-      int columnIndex, TimeColumn timeColumn, Column valueColumn, int rowIndex, long endTime) {
-
-    return rowIndex;
-  }
-
-  public TsBlockIterator getTsBlockIterator() {
-    return new TsBlockIterator();
-  }
-
-  // TODO need to be implemented when the data structure is defined
-  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
-
-    @Override
-    public boolean hasNext() {
-      return TsBlock.this.hasNext();
-    }
-
-    @Override
-    public boolean hasNext(long minBound, long maxBound) {
-      return hasNext();
-    }
-
-    @Override
-    public void next() {
-      TsBlock.this.next();
-    }
-
-    @Override
-    public long currentTime() {
-      return -1;
-    }
-
-    @Override
-    public Object currentValue() {
-      return null;
-    }
-
-    @Override
-    public void reset() {}
-
-    @Override
-    public int totalLength() {
-      return TsBlock.this.getCount();
-    }
-
-    @Override
-    public boolean hasNextTimeValuePair() {
-      return hasNext();
-    }
-
-    @Override
-    public TimeValuePair nextTimeValuePair() {
-      return null;
-    }
-
-    @Override
-    public TimeValuePair currentTimeValuePair() {
-      return null;
-    }
-
-    @Override
-    public void close() {}
-  }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
new file mode 100644
index 0000000..9c38360
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+/**
+ * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
+ * the columns.
+ */
+public class TsBlock {
+
+  public static final int INSTANCE_SIZE = ClassLayout.parseClass(TsBlock.class).instanceSize();
+
+  private static final Column[] EMPTY_COLUMNS = new Column[0];
+
+  /**
+   * Visible to give trusted classes like {@link TsBlockBuilder} access to a constructor that
+   * doesn't defensively copy the valueColumns
+   */
+  static TsBlock wrapBlocksWithoutCopy(
+      int positionCount, TimeColumn timeColumn, Column[] valueColumns) {
+    return new TsBlock(false, positionCount, timeColumn, valueColumns);
+  }
+
+  // TODO rethink about if we really need this field
+  // Describe the column info
+  private TsBlockMetadata metadata;
+
+  private final TimeColumn timeColumn;
+
+  private final Column[] valueColumns;
+
+  private final int positionCount;
+
+  private volatile long retainedSizeInBytes = -1;
+
+  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+  }
+
+  public TsBlock(int positionCount) {
+    this(false, positionCount, null, EMPTY_COLUMNS);
+  }
+
+  public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
+    this(true, positionCount, timeColumn, valueColumns);
+  }
+
+  private TsBlock(
+      boolean columnsCopyRequired,
+      int positionCount,
+      TimeColumn timeColumn,
+      Column[] valueColumns) {
+    requireNonNull(valueColumns, "blocks is null");
+    this.positionCount = positionCount;
+    this.timeColumn = timeColumn;
+    if (valueColumns.length == 0) {
+      this.valueColumns = EMPTY_COLUMNS;
+      // Empty blocks are not considered "retained" by any particular page
+      this.retainedSizeInBytes = INSTANCE_SIZE;
+    } else {
+      this.valueColumns = columnsCopyRequired ? valueColumns.clone() : valueColumns;
+    }
+  }
+
+  public boolean hasNext() {
+    return false;
+  }
+
+  public void next() {}
+
+  public TsBlockMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public long getEndTime() {
+    return timeColumn.getEndTime();
+  }
+
+  public boolean isEmpty() {
+    return positionCount == 0;
+  }
+
+  public long getRetainedSizeInBytes() {
+    long retainedSizeInBytes = this.retainedSizeInBytes;
+    if (retainedSizeInBytes < 0) {
+      return updateRetainedSize();
+    }
+    return retainedSizeInBytes;
+  }
+
+  /**
+   * @param positionOffset start offset
+   * @param length slice length
+   * @return view of current TsBlock start from positionOffset to positionOffset + length
+   */
+  public TsBlock getRegion(int positionOffset, int length) {
+    if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid position %s and length %s in page with %s positions",
+              positionOffset, length, positionCount));
+    }
+    int channelCount = getValueColumnCount();
+    Column[] slicedColumns = new Column[channelCount];
+    for (int i = 0; i < channelCount; i++) {
+      slicedColumns[i] = valueColumns[i].getRegion(positionOffset, length);
+    }
+    return wrapBlocksWithoutCopy(
+        length, (TimeColumn) timeColumn.getRegion(positionOffset, length), slicedColumns);
+  }
+
+  public TsBlock appendValueColumn(Column column) {
+    requireNonNull(column, "column is null");
+    if (positionCount != column.getPositionCount()) {
+      throw new IllegalArgumentException("Block does not have same position count");
+    }
+
+    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+    newBlocks[valueColumns.length] = column;
+    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+  }
+
+  public long getTimeByIndex(int index) {
+    return timeColumn.getLong(index);
+  }
+
+  public int getValueColumnCount() {
+    return valueColumns.length;
+  }
+
+  public TimeColumn getTimeColumn() {
+    return timeColumn;
+  }
+
+  public Column getColumn(int columnIndex) {
+    return valueColumns[columnIndex];
+  }
+
+  public TsBlockIterator getTsBlockIterator() {
+    return new TsBlockIterator(0);
+  }
+
+  /** Only used for the batch data of vector time series. */
+  public IBatchDataIterator getBatchDataIterator(int subIndex) {
+    return new AlignedTsBlockIterator(0, subIndex);
+  }
+
+  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+
+    protected int index;
+
+    public TsBlockIterator(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < positionCount;
+    }
+
+    @Override
+    public boolean hasNext(long minBound, long maxBound) {
+      return hasNext();
+    }
+
+    @Override
+    public void next() {
+      index++;
+    }
+
+    @Override
+    public long currentTime() {
+      return timeColumn.getLong(index);
+    }
+
+    @Override
+    public Object currentValue() {
+      return valueColumns[0].getTsPrimitiveType(index).getValue();
+    }
+
+    @Override
+    public void reset() {
+      index = 0;
+    }
+
+    @Override
+    public int totalLength() {
+      return positionCount;
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      return hasNext();
+    }
+
+    @Override
+    public TimeValuePair nextTimeValuePair() {
+      TimeValuePair res = currentTimeValuePair();
+      next();
+      return res;
+    }
+
+    @Override
+    public TimeValuePair currentTimeValuePair() {
+      return new TimeValuePair(
+          timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  private class AlignedTsBlockIterator extends TsBlockIterator {
+
+    private final int subIndex;
+
+    private AlignedTsBlockIterator(int index, int subIndex) {
+      super(index);
+      this.subIndex = subIndex;
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (super.hasNext() && currentValue() == null) {
+        super.next();
+      }
+      return super.hasNext();
+    }
+
+    @Override
+    public boolean hasNext(long minBound, long maxBound) {
+      while (super.hasNext() && currentValue() == null) {
+        if (currentTime() < minBound || currentTime() >= maxBound) {
+          break;
+        }
+        super.next();
+      }
+      return super.hasNext();
+    }
+
+    @Override
+    public Object currentValue() {
+      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+      return v == null ? null : v.getValue();
+    }
+
+    @Override
+    public int totalLength() {
+      // aligned timeseries' BatchData length() may return the length of time column
+      // we need traverse to VectorBatchDataIterator calculate the actual value column's length
+      int cnt = 0;
+      int indexSave = index;
+      while (hasNext()) {
+        cnt++;
+        next();
+      }
+      index = indexSave;
+      return cnt;
+    }
+  }
+
+  private long updateRetainedSize() {
+    long retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueColumns);
+    retainedSizeInBytes += timeColumn.getRetainedSizeInBytes();
+    for (Column column : valueColumns) {
+      retainedSizeInBytes += column.getRetainedSizeInBytes();
+    }
+    this.retainedSizeInBytes = retainedSizeInBytes;
+    return retainedSizeInBytes;
+  }
+
+  private static int determinePositionCount(Column... columns) {
+    requireNonNull(columns, "columns is null");
+    if (columns.length == 0) {
+      throw new IllegalArgumentException("columns is empty");
+    }
+
+    return columns[0].getPositionCount();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
new file mode 100644
index 0000000..9e615cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.*;
+
+import java.util.List;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class TsBlockBuilder {
+
+  // We choose default initial size to be 8 for TsBlockBuilder and ColumnBuilder
+  // so the underlying data is larger than the object overhead, and the size is power of 2.
+  //
+  // This could be any other small number.
+  private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
+
+  private TimeColumnBuilder timeColumnBuilder;
+  private ColumnBuilder[] valueColumnBuilders;
+  private List<TSDataType> types;
+  private TsBlockBuilderStatus tsBlockBuilderStatus;
+  private int declaredPositions;
+
+  private TsBlockBuilder() {}
+
+  /**
+   * Create a TsBlockBuilder with given types.
+   *
+   * <p>A TsBlockBuilder instance created with this constructor has no estimation about bytes per
+   * entry, therefore it can resize frequently while appending new rows.
+   *
+   * <p>This constructor should only be used to get the initial TsBlockBuilder. Once the
+   * TsBlockBuilder is full use reset() or createTsBlockBuilderLike() to create a new TsBlockBuilder
+   * instance with its size estimated based on previous data.
+   */
+  public TsBlockBuilder(List<TSDataType> types) {
+    this(DEFAULT_INITIAL_EXPECTED_ENTRIES, types);
+  }
+
+  public TsBlockBuilder(int initialExpectedEntries, List<TSDataType> types) {
+    this(initialExpectedEntries, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, types);
+  }
+
+  public static TsBlockBuilder createWithOnlyTimeColumn() {
+    TsBlockBuilder res = new TsBlockBuilder();
+    res.tsBlockBuilderStatus = new TsBlockBuilderStatus(DEFAULT_INITIAL_EXPECTED_ENTRIES);
+    res.timeColumnBuilder =
+        new TimeColumnBuilder(
+            res.tsBlockBuilderStatus.createColumnBuilderStatus(), DEFAULT_INITIAL_EXPECTED_ENTRIES);
+    return res;
+  }
+
+  public static TsBlockBuilder withMaxTsBlockSize(int maxTsBlockBytes, List<TSDataType> types) {
+    return new TsBlockBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxTsBlockBytes, types);
+  }
+
+  private TsBlockBuilder(int initialExpectedEntries, int maxTsBlockBytes, List<TSDataType> types) {
+    this.types = requireNonNull(types, "types is null");
+
+    tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+    timeColumnBuilder =
+        new TimeColumnBuilder(
+            tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+      // instead of switch-case
+      switch (types.get(i)) {
+        case BOOLEAN:
+          valueColumnBuilders[i] =
+              new BooleanColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT32:
+          valueColumnBuilders[i] =
+              new IntColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT64:
+          valueColumnBuilders[i] =
+              new LongColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case FLOAT:
+          valueColumnBuilders[i] =
+              new FloatColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case DOUBLE:
+          valueColumnBuilders[i] =
+              new DoubleColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case TEXT:
+          valueColumnBuilders[i] =
+              new BinaryColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+      }
+    }
+  }
+
+  private TsBlockBuilder(
+      int maxTsBlockBytes,
+      List<TSDataType> types,
+      TimeColumnBuilder templateTimeColumnBuilder,
+      ColumnBuilder[] templateValueColumnBuilders) {
+    this.types = requireNonNull(types, "types is null");
+
+    tsBlockBuilderStatus = new TsBlockBuilderStatus(maxTsBlockBytes);
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+
+    checkArgument(
+        templateValueColumnBuilders.length == types.size(),
+        "Size of templates and types should match");
+    timeColumnBuilder =
+        (TimeColumnBuilder)
+            templateTimeColumnBuilder.newColumnBuilderLike(
+                tsBlockBuilderStatus.createColumnBuilderStatus());
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      valueColumnBuilders[i] =
+          templateValueColumnBuilders[i].newColumnBuilderLike(
+              tsBlockBuilderStatus.createColumnBuilderStatus());
+    }
+  }
+
+  public void buildValueColumnBuilders(List<TSDataType> types) {
+    this.types = requireNonNull(types, "types is null");
+    valueColumnBuilders = new ColumnBuilder[types.size()];
+    int initialExpectedEntries = timeColumnBuilder.getPositionCount();
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      // TODO use Type interface to encapsulate createColumnBuilder to each concrete type class
+      // instead of switch-case
+      switch (types.get(i)) {
+        case BOOLEAN:
+          valueColumnBuilders[i] =
+              new BooleanColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT32:
+          valueColumnBuilders[i] =
+              new IntColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case INT64:
+          valueColumnBuilders[i] =
+              new LongColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case FLOAT:
+          valueColumnBuilders[i] =
+              new FloatColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case DOUBLE:
+          valueColumnBuilders[i] =
+              new DoubleColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        case TEXT:
+          valueColumnBuilders[i] =
+              new BinaryColumnBuilder(
+                  tsBlockBuilderStatus.createColumnBuilderStatus(), initialExpectedEntries);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown data type: " + types.get(i));
+      }
+    }
+  }
+
+  public void reset() {
+    if (isEmpty()) {
+      return;
+    }
+    tsBlockBuilderStatus =
+        new TsBlockBuilderStatus(tsBlockBuilderStatus.getMaxTsBlockSizeInBytes());
+
+    declaredPositions = 0;
+
+    for (int i = 0; i < valueColumnBuilders.length; i++) {
+      valueColumnBuilders[i] =
+          valueColumnBuilders[i].newColumnBuilderLike(
+              tsBlockBuilderStatus.createColumnBuilderStatus());
+    }
+  }
+
+  public TsBlockBuilder newTsBlockBuilderLike() {
+    return new TsBlockBuilder(
+        tsBlockBuilderStatus.getMaxTsBlockSizeInBytes(),
+        types,
+        timeColumnBuilder,
+        valueColumnBuilders);
+  }
+
+  public TimeColumnBuilder getTimeColumnBuilder() {
+    return timeColumnBuilder;
+  }
+
+  public ColumnBuilder getColumnBuilder(int channel) {
+    return valueColumnBuilders[channel];
+  }
+
+  public TSDataType getType(int channel) {
+    return types.get(channel);
+  }
+
+  public void declarePosition() {
+    declaredPositions++;
+  }
+
+  public void declarePositions(int positions) {
+    declaredPositions += positions;
+  }
+
+  public boolean isFull() {
+    return declaredPositions == Integer.MAX_VALUE || tsBlockBuilderStatus.isFull();
+  }
+
+  public boolean isEmpty() {
+    return declaredPositions == 0;
+  }
+
+  public int getPositionCount() {
+    return declaredPositions;
+  }
+
+  public long getSizeInBytes() {
+    return tsBlockBuilderStatus.getSizeInBytes();
+  }
+
+  public long getRetainedSizeInBytes() {
+    // We use a foreach loop instead of streams
+    // as it has much better performance.
+    long retainedSizeInBytes = timeColumnBuilder.getRetainedSizeInBytes();
+    for (ColumnBuilder columnBuilder : valueColumnBuilders) {
+      retainedSizeInBytes += columnBuilder.getRetainedSizeInBytes();
+    }
+    return retainedSizeInBytes;
+  }
+
+  public TsBlock build() {
+    if (valueColumnBuilders.length == 0) {
+      return new TsBlock(declaredPositions);
+    }
+    TimeColumn timeColumn = (TimeColumn) timeColumnBuilder.build();
+    if (timeColumn.getPositionCount() != declaredPositions) {
+      throw new IllegalStateException(
+          format(
+              "Declared positions (%s) does not match time column's number of entries (%s)",
+              declaredPositions, timeColumn.getPositionCount()));
+    }
+
+    Column[] columns = new Column[valueColumnBuilders.length];
+    for (int i = 0; i < columns.length; i++) {
+      columns[i] = valueColumnBuilders[i].build();
+      if (columns[i].getPositionCount() != declaredPositions) {
+        throw new IllegalStateException(
+            format(
+                "Declared positions (%s) does not match column %s's number of entries (%s)",
+                declaredPositions, i, columns[i].getPositionCount()));
+      }
+    }
+
+    return TsBlock.wrapBlocksWithoutCopy(declaredPositions, timeColumn, columns);
+  }
+
+  private static void checkArgument(boolean expression, String errorMessage) {
+    if (!expression) {
+      throw new IllegalArgumentException(errorMessage);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
new file mode 100644
index 0000000..a48e9cd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilderStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilderStatus;
+
+public class TsBlockBuilderStatus {
+
+  public static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = 1024 * 1024;
+
+  private final int maxTsBlockSizeInBytes;
+
+  private long currentSize;
+
+  public TsBlockBuilderStatus() {
+    this(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+  }
+
+  public TsBlockBuilderStatus(int maxTsBlockSizeInBytes) {
+    this.maxTsBlockSizeInBytes = maxTsBlockSizeInBytes;
+  }
+
+  public ColumnBuilderStatus createColumnBuilderStatus() {
+    return new ColumnBuilderStatus(this);
+  }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return maxTsBlockSizeInBytes;
+  }
+
+  public boolean isEmpty() {
+    return currentSize == 0;
+  }
+
+  public boolean isFull() {
+    return currentSize >= maxTsBlockSizeInBytes;
+  }
+
+  public void addBytes(int bytes) {
+    if (bytes < 0) {
+      throw new IllegalArgumentException("bytes cannot be negative");
+    }
+    currentSize += bytes;
+  }
+
+  public long getSizeInBytes() {
+    return currentSize;
+  }
+
+  @Override
+  public String toString() {
+    return "TsBlockBuilderStatus{"
+        + "maxTsBlockSizeInBytes="
+        + maxTsBlockSizeInBytes
+        + ", currentSize="
+        + currentSize
+        + '}';
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
similarity index 97%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
index 7d3a5dd..0f16f1e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TsBlockMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
+package org.apache.iotdb.tsfile.read.common.block;
 
 import java.util.List;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
new file mode 100644
index 0000000..cfb22c9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BinaryColumn.class).instanceSize();
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final Binary[] values;
+
+  private final long retainedSizeInBytes;
+
+  public BinaryColumn(int positionCount, Optional<boolean[]> valueIsNull, Binary[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  BinaryColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, Binary[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    // TODO we need to sum up all the Binary's retainedSize here
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public Binary getBinary(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsBinary(getBinary(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
new file mode 100644
index 0000000..c5c629f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BinaryColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BinaryColumnBuilder.class).instanceSize();
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private static final BinaryColumn NULL_VALUE_BLOCK =
+      new BinaryColumn(0, 1, new boolean[] {true}, new Binary[1]);
+
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private Binary[] values = new Binary[0];
+
+  private long arraysRetainedSizeInBytes;
+
+  public BinaryColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.initialEntryCount = max(expectedEntries, 1);
+    this.columnBuilderStatus = columnBuilderStatus;
+    updateArraysDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeBinary(Binary value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeBinary(value.getBinary());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    BinaryColumn column = (BinaryColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeBinary(column.getBinary(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new BinaryColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    // TODO we need to sum up all the Binary's retainedSize here
+    long size = INSTANCE_SIZE + arraysRetainedSizeInBytes;
+    if (columnBuilderStatus != null) {
+      size += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+    return size;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    // TODO we should take retain size into account here
+    return new BinaryColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateArraysDataSize();
+  }
+
+  private void updateArraysDataSize() {
+    arraysRetainedSizeInBytes = sizeOf(valueIsNull) + sizeOf(values);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
new file mode 100644
index 0000000..a2b8550
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BooleanColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Byte.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final boolean[] values;
+
+  private final long retainedSizeInBytes;
+
+  public BooleanColumn(int positionCount, Optional<boolean[]> valueIsNull, boolean[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  BooleanColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, boolean[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public boolean getBoolean(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsBoolean(getBoolean(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
new file mode 100644
index 0000000..5e7629d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class BooleanColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(BooleanColumnBuilder.class).instanceSize();
+  private static final BooleanColumn NULL_VALUE_BLOCK =
+      new BooleanColumn(0, 1, new boolean[] {true}, new boolean[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private boolean[] values = new boolean[0];
+
+  private long retainedSizeInBytes;
+
+  public BooleanColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeBoolean(boolean value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeBoolean(value.getBoolean());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    BooleanColumn column = (BooleanColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeBoolean(column.getBoolean(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(BooleanColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new BooleanColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new BooleanColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
new file mode 100644
index 0000000..adc06a1
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface Column {
+
+  /** Gets a boolean at {@code position}. */
+  default boolean getBoolean(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian int at {@code position}. */
+  default int getInt(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a little endian long at {@code position}. */
+  default long getLong(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a float at {@code position}. */
+  default float getFloat(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a double at {@code position}. */
+  default double getDouble(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a Binary at {@code position}. */
+  default Binary getBinary(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Gets a TsPrimitiveType at {@code position}. */
+  default TsPrimitiveType getTsPrimitiveType(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * Is the specified position null?
+   *
+   * @throws IllegalArgumentException if this position is not valid. The method may return false
+   *     without throwing exception when there are no nulls in the block, even if the position is
+   *     invalid
+   */
+  boolean isNull(int position);
+
+  /** Returns the number of positions in this block. */
+  int getPositionCount();
+
+  /**
+   * Returns the retained size of this column in memory, including over-allocations. This method is
+   * called from the inner most execution loop and must be fast.
+   */
+  long getRetainedSizeInBytes();
+
+  /**
+   * Returns a column starting at the specified position and extends for the specified length. The
+   * specified region must be entirely contained within this column.
+   *
+   * <p>The region can be a view over this column. If this column is released, the region column may
+   * also be released. If the region column is released, this block may also be released.
+   */
+  Column getRegion(int positionOffset, int length);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
new file mode 100644
index 0000000..a16e5fa
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public interface ColumnBuilder {
+
+  /** Write a boolean to the current entry; */
+  default ColumnBuilder writeBoolean(boolean value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a short to the current entry; */
+  default ColumnBuilder writeInt(int value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a int to the current entry; */
+  default ColumnBuilder writeLong(long value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a long to the current entry; */
+  default ColumnBuilder writeFloat(float value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a byte sequences to the current entry; */
+  default ColumnBuilder writeDouble(double value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a Binary to the current entry; */
+  default ColumnBuilder writeBinary(Binary value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** Write a TsPrimitiveType sequences to the current entry; */
+  default ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder);
+
+  /** Appends a null value to the block. */
+  ColumnBuilder appendNull();
+
+  /** Builds the block. This method can be called multiple times. */
+  Column build();
+
+  /**
+   * Returns the retained size of this column in memory, including over-allocations. This method is
+   * called from the inner most execution loop and must be fast.
+   */
+  long getRetainedSizeInBytes();
+
+  /**
+   * Creates a new column builder of the same type based on the current usage statistics of this
+   * column builder.
+   */
+  ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus);
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
new file mode 100644
index 0000000..d0f62a2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnBuilderStatus {
+
+  public static final int INSTANCE_SIZE = deepInstanceSize(ColumnBuilderStatus.class);
+
+  private final TsBlockBuilderStatus tsBlockBuilderStatus;
+
+  private int currentSize;
+
+  public ColumnBuilderStatus(TsBlockBuilderStatus tsBlockBuilderStatus) {
+    this.tsBlockBuilderStatus =
+        requireNonNull(tsBlockBuilderStatus, "tsBlockBuilderStatus must not be null");
+  }
+
+  public int getMaxTsBlockSizeInBytes() {
+    return tsBlockBuilderStatus.getMaxTsBlockSizeInBytes();
+  }
+
+  public void addBytes(int bytes) {
+    currentSize += bytes;
+    tsBlockBuilderStatus.addBytes(bytes);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnBuilderStatus{" + ", currentSize=" + currentSize + '}';
+  }
+
+  /**
+   * Computes the size of an instance of this class assuming that all reference fields are non-null
+   */
+  private static int deepInstanceSize(Class<?> clazz) {
+    if (clazz.isArray()) {
+      throw new IllegalArgumentException(
+          format(
+              "Cannot determine size of %s because it contains an array", clazz.getSimpleName()));
+    }
+    if (clazz.isInterface()) {
+      throw new IllegalArgumentException(format("%s is an interface", clazz.getSimpleName()));
+    }
+    if (Modifier.isAbstract(clazz.getModifiers())) {
+      throw new IllegalArgumentException(format("%s is abstract", clazz.getSimpleName()));
+    }
+    if (!clazz.getSuperclass().equals(Object.class)) {
+      throw new IllegalArgumentException(
+          format(
+              "Cannot determine size of a subclass. %s extends from %s",
+              clazz.getSimpleName(), clazz.getSuperclass().getSimpleName()));
+    }
+
+    int size = ClassLayout.parseClass(clazz).instanceSize();
+    for (Field field : clazz.getDeclaredFields()) {
+      if (!field.getType().isPrimitive()) {
+        size += deepInstanceSize(field.getType());
+      }
+    }
+    return size;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
new file mode 100644
index 0000000..150f713
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import static java.lang.Math.ceil;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ColumnUtil {
+
+  private static final double BLOCK_RESET_SKEW = 1.25;
+
+  private static final int DEFAULT_CAPACITY = 64;
+  // See java.util.ArrayList for an explanation
+  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+  private ColumnUtil() {}
+
+  static void checkArrayRange(int[] array, int offset, int length) {
+    requireNonNull(array, "array is null");
+    if (offset < 0 || length < 0 || offset + length > array.length) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid offset %s and length %s in array with %s elements",
+              offset, length, array.length));
+    }
+  }
+
+  static void checkValidRegion(int positionCount, int positionOffset, int length) {
+    if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid position %s and length %s in block with %s positions",
+              positionOffset, length, positionCount));
+    }
+  }
+
+  static void checkValidPositions(boolean[] positions, int positionCount) {
+    if (positions.length != positionCount) {
+      throw new IllegalArgumentException(
+          format(
+              "Invalid positions array size %d, actual position count is %d",
+              positions.length, positionCount));
+    }
+  }
+
+  static void checkValidPosition(int position, int positionCount) {
+    if (position < 0 || position >= positionCount) {
+      throw new IllegalArgumentException(
+          format("Invalid position %s in block with %s positions", position, positionCount));
+    }
+  }
+
+  static int calculateNewArraySize(int currentSize) {
+    // grow array by 50%
+    long newSize = (long) currentSize + (currentSize >> 1);
+
+    // verify new size is within reasonable bounds
+    if (newSize < DEFAULT_CAPACITY) {
+      newSize = DEFAULT_CAPACITY;
+    } else if (newSize > MAX_ARRAY_SIZE) {
+      newSize = MAX_ARRAY_SIZE;
+      if (newSize == currentSize) {
+        throw new IllegalArgumentException(format("Cannot grow array beyond '%s'", MAX_ARRAY_SIZE));
+      }
+    }
+    return (int) newSize;
+  }
+
+  static int calculateBlockResetSize(int currentSize) {
+    long newSize = (long) ceil(currentSize * BLOCK_RESET_SKEW);
+
+    // verify new size is within reasonable bounds
+    if (newSize < DEFAULT_CAPACITY) {
+      newSize = DEFAULT_CAPACITY;
+    } else if (newSize > MAX_ARRAY_SIZE) {
+      newSize = MAX_ARRAY_SIZE;
+    }
+    return (int) newSize;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
new file mode 100644
index 0000000..d16e0c0
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(DoubleColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Double.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final double[] values;
+
+  private final long retainedSizeInBytes;
+
+  public DoubleColumn(int positionCount, Optional<boolean[]> valueIsNull, double[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  DoubleColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, double[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public double getDouble(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsDouble(getDouble(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
new file mode 100644
index 0000000..a01f6f2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class DoubleColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(DoubleColumnBuilder.class).instanceSize();
+  private static final DoubleColumn NULL_VALUE_BLOCK =
+      new DoubleColumn(0, 1, new boolean[] {true}, new double[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private double[] values = new double[0];
+
+  private long retainedSizeInBytes;
+
+  public DoubleColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeDouble(double value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeDouble(value.getDouble());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    DoubleColumn column = (DoubleColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeDouble(column.getDouble(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(DoubleColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new DoubleColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new DoubleColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
new file mode 100644
index 0000000..6543bde
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(FloatColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Float.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final float[] values;
+
+  private final long retainedSizeInBytes;
+
+  public FloatColumn(int positionCount, Optional<boolean[]> valueIsNull, float[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  FloatColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, float[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
+  }
+
+  @Override
+  public float getFloat(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsFloat(getFloat(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
new file mode 100644
index 0000000..ff89ea2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class FloatColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(FloatColumnBuilder.class).instanceSize();
+  private static final FloatColumn NULL_VALUE_BLOCK =
+      new FloatColumn(0, 1, new boolean[] {true}, new float[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private float[] values = new float[0];
+
+  private long retainedSizeInBytes;
+
+  public FloatColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeFloat(float value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeFloat(value.getFloat());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    FloatColumn column = (FloatColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeFloat(column.getFloat(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(FloatColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new FloatColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new FloatColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
new file mode 100644
index 0000000..4ad843f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(IntColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final int[] values;
+
+  private final long retainedSizeInBytes;
+
+  public IntColumn(int positionCount, Optional<boolean[]> valueIsNull, int[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  IntColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, int[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public int getInt(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsInt(getInt(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
new file mode 100644
index 0000000..9ab3f7f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class IntColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(IntColumnBuilder.class).instanceSize();
+  private static final IntColumn NULL_VALUE_BLOCK =
+      new IntColumn(0, 1, new boolean[] {true}, new int[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private int[] values = new int[0];
+
+  private long retainedSizeInBytes;
+
+  public IntColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeInt(int value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeInt(value.getInt());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    IntColumn column = (IntColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeInt(column.getInt(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(IntColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new IntColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new IntColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
new file mode 100644
index 0000000..9ab9edd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES + Byte.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final boolean[] valueIsNull;
+  private final long[] values;
+
+  private final long retainedSizeInBytes;
+
+  public LongColumn(int positionCount, Optional<boolean[]> valueIsNull, long[] values) {
+    this(0, positionCount, valueIsNull.orElse(null), values);
+  }
+
+  LongColumn(int arrayOffset, int positionCount, boolean[] valueIsNull, long[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    if (valueIsNull != null && valueIsNull.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("isNull length is less than positionCount");
+    }
+    this.valueIsNull = valueIsNull;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return new TsPrimitiveType.TsLong(getLong(position));
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return valueIsNull != null && valueIsNull[position + arrayOffset];
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
new file mode 100644
index 0000000..9053913
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class LongColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(LongColumnBuilder.class).instanceSize();
+  private static final LongColumn NULL_VALUE_BLOCK =
+      new LongColumn(0, 1, new boolean[] {true}, new long[1]);
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+  private boolean hasNullValue;
+  private boolean hasNonNullValue;
+
+  // it is assumed that these arrays are the same length
+  private boolean[] valueIsNull = new boolean[0];
+  private long[] values = new long[0];
+
+  private long retainedSizeInBytes;
+
+  public LongColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeLong(long value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    hasNonNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
+    return writeLong(value.getLong());
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    int count = timeBuilder.getPositionCount();
+    int index = offset;
+    LongColumn column = (LongColumn) valueColumn;
+    for (int i = 0; i < count; i++) {
+      if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+        writeLong(column.getLong(index++));
+      } else {
+        appendNull();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    valueIsNull[positionCount] = true;
+
+    hasNullValue = true;
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public Column build() {
+    if (!hasNonNullValue) {
+      return new RunLengthEncodedColumn(NULL_VALUE_BLOCK, positionCount);
+    }
+    return new LongColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new LongColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    valueIsNull = Arrays.copyOf(valueIsNull, newSize);
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
new file mode 100644
index 0000000..3977500
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+
+public class RunLengthEncodedColumn implements Column {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(RunLengthEncodedColumn.class).instanceSize();
+
+  private final Column value;
+  private final int positionCount;
+
+  public RunLengthEncodedColumn(Column value, int positionCount) {
+    requireNonNull(value, "value is null");
+    if (value.getPositionCount() != 1) {
+      throw new IllegalArgumentException(
+          format(
+              "Expected value to contain a single position but has %s positions",
+              value.getPositionCount()));
+    }
+
+    if (value instanceof RunLengthEncodedColumn) {
+      this.value = ((RunLengthEncodedColumn) value).getValue();
+    } else {
+      this.value = value;
+    }
+
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+
+    this.positionCount = positionCount;
+  }
+
+  public Column getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean getBoolean(int position) {
+    checkReadablePosition(position);
+    return value.getBoolean(position);
+  }
+
+  @Override
+  public int getInt(int position) {
+    checkReadablePosition(position);
+    return value.getInt(position);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return value.getLong(position);
+  }
+
+  @Override
+  public float getFloat(int position) {
+    checkReadablePosition(position);
+    return value.getFloat(position);
+  }
+
+  @Override
+  public double getDouble(int position) {
+    checkReadablePosition(position);
+    return value.getDouble(position);
+  }
+
+  @Override
+  public Binary getBinary(int position) {
+    checkReadablePosition(position);
+    return value.getBinary(position);
+  }
+
+  @Override
+  public TsPrimitiveType getTsPrimitiveType(int position) {
+    checkReadablePosition(position);
+    return value.getTsPrimitiveType(position);
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    checkReadablePosition(position);
+    return value.isNull(0);
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return INSTANCE_SIZE + value.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(positionCount, positionOffset, length);
+    return new RunLengthEncodedColumn(value, length);
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= positionCount) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
new file mode 100644
index 0000000..c8d4748
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumn implements Column {
+
+  private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumn.class).instanceSize();
+  public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES;
+
+  private final int arrayOffset;
+  private final int positionCount;
+  private final long[] values;
+
+  private final long retainedSizeInBytes;
+
+  public TimeColumn(int positionCount, long[] values) {
+    this(0, positionCount, values);
+  }
+
+  TimeColumn(int arrayOffset, int positionCount, long[] values) {
+    if (arrayOffset < 0) {
+      throw new IllegalArgumentException("arrayOffset is negative");
+    }
+    this.arrayOffset = arrayOffset;
+    if (positionCount < 0) {
+      throw new IllegalArgumentException("positionCount is negative");
+    }
+    this.positionCount = positionCount;
+
+    if (values.length - arrayOffset < positionCount) {
+      throw new IllegalArgumentException("values length is less than positionCount");
+    }
+    this.values = values;
+
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+  }
+
+  @Override
+  public long getLong(int position) {
+    checkReadablePosition(position);
+    return values[position + arrayOffset];
+  }
+
+  @Override
+  public boolean isNull(int position) {
+    return false;
+  }
+
+  @Override
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public Column getRegion(int positionOffset, int length) {
+    checkValidRegion(getPositionCount(), positionOffset, length);
+    return new TimeColumn(positionOffset + arrayOffset, length, values);
+  }
+
+  public long getEndTime() {
+    return values[getPositionCount() + arrayOffset - 1];
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
new file mode 100644
index 0000000..9b455cb
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+
+import static java.lang.Math.max;
+import static org.apache.iotdb.tsfile.read.common.block.column.ColumnUtil.calculateBlockResetSize;
+import static org.openjdk.jol.util.VMSupport.sizeOf;
+
+public class TimeColumnBuilder implements ColumnBuilder {
+
+  private static final int INSTANCE_SIZE =
+      ClassLayout.parseClass(TimeColumnBuilder.class).instanceSize();
+
+  private final ColumnBuilderStatus columnBuilderStatus;
+  private boolean initialized;
+  private final int initialEntryCount;
+
+  private int positionCount;
+
+  private long[] values = new long[0];
+
+  private long retainedSizeInBytes;
+
+  public TimeColumnBuilder(ColumnBuilderStatus columnBuilderStatus, int expectedEntries) {
+    this.columnBuilderStatus = columnBuilderStatus;
+    this.initialEntryCount = max(expectedEntries, 1);
+
+    updateDataSize();
+  }
+
+  @Override
+  public ColumnBuilder writeLong(long value) {
+    if (values.length <= positionCount) {
+      growCapacity();
+    }
+
+    values[positionCount] = value;
+
+    positionCount++;
+    if (columnBuilderStatus != null) {
+      columnBuilderStatus.addBytes(TimeColumn.SIZE_IN_BYTES_PER_POSITION);
+    }
+    return this;
+  }
+
+  @Override
+  public int appendColumn(
+      TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  public ColumnBuilder appendNull() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  public Column build() {
+    return new TimeColumn(0, positionCount, values);
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return retainedSizeInBytes;
+  }
+
+  @Override
+  public ColumnBuilder newColumnBuilderLike(ColumnBuilderStatus columnBuilderStatus) {
+    return new TimeColumnBuilder(columnBuilderStatus, calculateBlockResetSize(positionCount));
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public long getTime(int position) {
+    checkReadablePosition(position);
+    return values[position];
+  }
+
+  private void growCapacity() {
+    int newSize;
+    if (initialized) {
+      newSize = ColumnUtil.calculateNewArraySize(values.length);
+    } else {
+      newSize = initialEntryCount;
+      initialized = true;
+    }
+
+    values = Arrays.copyOf(values, newSize);
+    updateDataSize();
+  }
+
+  private void updateDataSize() {
+    retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
+    if (columnBuilderStatus != null) {
+      retainedSizeInBytes += ColumnBuilderStatus.INSTANCE_SIZE;
+    }
+  }
+
+  private void checkReadablePosition(int position) {
+    if (position < 0 || position >= getPositionCount()) {
+      throw new IllegalArgumentException("position is not valid");
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index db76550..3affee8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.tsfile.read.reader;
 
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 2efe6802..dd75b21 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -25,7 +25,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
@@ -105,8 +107,38 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
 
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
-    // TODO need to implement for mpp
-    throw new IllegalStateException("We have not implemented this method.");
+    // TODO change from the row-based style to column-based style
+    TsBlockBuilder builder =
+        new TsBlockBuilder(
+            valuePageReaderList.stream()
+                .map(ValuePageReader::getDataType)
+                .collect(Collectors.toList()));
+    int timeIndex = -1;
+    while (timePageReader.hasNextTime()) {
+      long timestamp = timePageReader.nextTime();
+      timeIndex++;
+      // if all the sub sensors' value are null in current row, just discard it
+      boolean isNull = true;
+      Object notNullObject = null;
+      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
+      for (int i = 0; i < v.length; i++) {
+        ValuePageReader pageReader = valuePageReaderList.get(i);
+        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
+        if (v[i] != null) {
+          isNull = false;
+          notNullObject = v[i].getValue();
+        }
+      }
+      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
+      // sensor
+      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
+        builder.getTimeColumnBuilder().writeLong(timestamp);
+        for (int i = 0; i < v.length; i++) {
+          builder.getColumnBuilder(i).writeTsPrimitiveType(v[i]);
+        }
+      }
+    }
+    return builder.build();
   }
 
   public void setDeleteIntervalList(List<List<TimeRange>> list) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index c1a4c59..ec5ff85 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -26,7 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.common.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
@@ -35,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 public class PageReader implements IPageReader {
@@ -157,53 +161,76 @@ public class PageReader implements IPageReader {
   @Override
   public TsBlock getAllSatisfiedData(boolean ascending) throws IOException {
     // TODO we still need to consider data type, ascending and descending here
-    TsBlock tsBlock = new TsBlock();
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType));
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder valueBuilder = tsBlockBuilder.getColumnBuilder(0);
     if (filter == null || filter.satisfy(getStatistics())) {
-      while (timeDecoder.hasNext(timeBuffer)) {
-        long timestamp = timeDecoder.readLong(timeBuffer);
-        switch (dataType) {
-          case BOOLEAN:
+      switch (dataType) {
+        case BOOLEAN:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
-              //              tsBlock.putBoolean(timestamp, aBoolean);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeBoolean(aBoolean);
             }
-            break;
-          case INT32:
+          }
+          break;
+        case INT32:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             int anInt = valueDecoder.readInt(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
-              //              tsBlock.putInt(timestamp, anInt);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeInt(anInt);
             }
-            break;
-          case INT64:
+          }
+          break;
+        case INT64:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             long aLong = valueDecoder.readLong(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
-              //              tsBlock.putLong(timestamp, aLong);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeLong(aLong);
             }
-            break;
-          case FLOAT:
+          }
+          break;
+        case FLOAT:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             float aFloat = valueDecoder.readFloat(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
-              //              tsBlock.putFloat(timestamp, aFloat);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeFloat(aFloat);
             }
-            break;
-          case DOUBLE:
+          }
+          break;
+        case DOUBLE:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             double aDouble = valueDecoder.readDouble(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
-              //              tsBlock.putDouble(timestamp, aDouble);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeDouble(aDouble);
             }
-            break;
-          case TEXT:
+          }
+          break;
+        case TEXT:
+          while (timeDecoder.hasNext(timeBuffer)) {
+            long timestamp = timeDecoder.readLong(timeBuffer);
             Binary aBinary = valueDecoder.readBinary(valueBuffer);
             if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
-              //              tsBlock.putBinary(timestamp, aBinary);
+              timeBuilder.writeLong(timestamp);
+              valueBuilder.writeBinary(aBinary);
             }
-            break;
-          default:
-            throw new UnSupportedDataTypeException(String.valueOf(dataType));
-        }
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return tsBlock;
+    return tsBlockBuilder.build();
   }
 
   @Override