You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/12 00:51:34 UTC

[iotdb] branch master updated: [IOTDB-2727] tsblock serde (#5459)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8282f94043 [IOTDB-2727] tsblock serde (#5459)
8282f94043 is described below

commit 8282f94043cf0c49d20a7c6f966cb0018d26e8cf
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Apr 12 08:51:30 2022 +0800

    [IOTDB-2727] tsblock serde (#5459)
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   9 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |   5 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |   1 +
 .../iotdb/db/mpp/buffer/TsBlockSerdeFactory.java   |   2 +
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  27 +++-
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |   1 +
 .../java/org/apache/iotdb/db/mpp/buffer/Utils.java |   1 +
 tsfile/pom.xml                                     |  22 ++++
 .../read/common/block/column/BinaryColumn.java     |  16 +++
 .../common/block/column/BinaryColumnBuilder.java   |   6 +
 .../read/common/block/column/BooleanColumn.java    |  16 +++
 .../common/block/column/BooleanColumnBuilder.java  |   6 +
 .../tsfile/read/common/block/column/Column.java    |  13 ++
 .../read/common/block/column/ColumnBuilder.java    |   4 +
 .../read/common/block/column/ColumnEncoder.java    | 108 ++++++++++++++++
 .../common/block/column/ColumnEncoderFactory.java  |  24 ++--
 .../read/common/block/column/ColumnEncoding.java   |  64 +++++++++
 .../read/common/block/column/DoubleColumn.java     |  16 +++
 .../common/block/column/DoubleColumnBuilder.java   |   6 +
 .../read/common/block/column/FloatColumn.java      |  16 +++
 .../common/block/column/FloatColumnBuilder.java    |   6 +
 .../block/column/Int32ArrayColumnEncoder.java      |  90 +++++++++++++
 .../block/column/Int64ArrayColumnEncoder.java      |  90 +++++++++++++
 .../tsfile/read/common/block/column/IntColumn.java |  16 +++
 .../read/common/block/column/IntColumnBuilder.java |   6 +
 .../read/common/block/column/LongColumn.java       |  16 +++
 .../common/block/column/LongColumnBuilder.java     |   6 +
 .../block/column/RunLengthEncodedColumn.java       |  16 +++
 .../read/common/block/column/TimeColumn.java       |  17 +++
 .../common/block/column/TimeColumnBuilder.java     |   7 +
 .../read/common/block/column/TsBlockSerde.java     | 125 ++++++++++++++++++
 .../tsfile/common/block/ColumnEncoderTest.java     | 144 +++++++++++++++++++++
 .../common/block/Int32ArrayColumnEncoderTest.java  | 113 ++++++++++++++++
 .../common/block/Int64ArrayColumnEncoderTest.java  | 114 ++++++++++++++++
 .../tsfile/common/block/TsBlockSerdeTest.java      |  97 ++++++++++++++
 35 files changed, 1206 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 42857b65e9..00e955e9a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
 import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
 import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.thrift.TException;
@@ -79,8 +80,12 @@ public class DataBlockManager implements IDataBlockManager {
       GetDataBlockResponse resp = new GetDataBlockResponse();
       SinkHandle sinkHandle = sinkHandles.get(req.getSourceFragmentInstanceId());
       for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
-        ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
-        resp.addToTsBlocks(serializedTsBlock);
+        try {
+          ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
+          resp.addToTsBlocks(serializedTsBlock);
+        } catch (IOException e) {
+          throw new TException(e);
+        }
       }
       return resp;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index a3e106b5f9..a5fa545bcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
@@ -251,13 +252,13 @@ public class SinkHandle implements ISinkHandle {
     throw new UnsupportedOperationException();
   }
 
-  ByteBuffer getSerializedTsBlock(int sequenceId) {
+  ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
     TsBlock tsBlock;
     tsBlock = sequenceIdToTsBlock.get(sequenceId);
     if (tsBlock == null) {
       throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
     }
-    return serde.serialized(tsBlock);
+    return serde.serialize(tsBlock);
   }
 
   void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 7366f3630c..b577174702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
 import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerdeFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerdeFactory.java
index 7aa7329987..032af08e26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerdeFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerdeFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
 import java.util.function.Supplier;
 
 public class TsBlockSerdeFactory implements Supplier<TsBlockSerde> {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 0c03412f39..64229ebe30 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -83,7 +83,7 @@ public class SinkHandleTest {
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockClient,
-            new TsBlockSerde(),
+            Utils.createMockTsBlockSerde(mockTsBlockSize),
             mockSinkHandleListener);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -125,7 +125,12 @@ public class SinkHandleTest {
 
     // Get tsblocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
-      sinkHandle.getSerializedTsBlock(i);
+      try {
+        sinkHandle.getSerializedTsBlock(i);
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
       Assert.assertTrue(sinkHandle.isFull().isDone());
     }
     Assert.assertFalse(sinkHandle.isFinished());
@@ -210,7 +215,7 @@ public class SinkHandleTest {
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockClient,
-            new TsBlockSerde(),
+            Utils.createMockTsBlockSerde(mockTsBlockSize),
             mockSinkHandleListener);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -251,7 +256,12 @@ public class SinkHandleTest {
 
     // Get tsblocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
-      sinkHandle.getSerializedTsBlock(i);
+      try {
+        sinkHandle.getSerializedTsBlock(i);
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
       Assert.assertFalse(sinkHandle.isFull().isDone());
     }
     Assert.assertFalse(sinkHandle.isFinished());
@@ -324,7 +334,12 @@ public class SinkHandleTest {
 
     // Get tsblocks after the SinkHandle is closed.
     for (int i = numOfMockTsBlock; i < numOfMockTsBlock * 2; i++) {
-      sinkHandle.getSerializedTsBlock(i);
+      try {
+        sinkHandle.getSerializedTsBlock(i);
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
     }
     Assert.assertFalse(sinkHandle.isFinished());
 
@@ -382,7 +397,7 @@ public class SinkHandleTest {
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockClient,
-            new TsBlockSerde(),
+            Utils.createMockTsBlockSerde(mockTsBlockSize),
             mockSinkHandleListener);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 77b987a1c1..55c40e8015 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
 import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import org.apache.thrift.TException;
 import org.junit.Assert;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
index e61e623937..b0f778d989 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/Utils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import com.google.common.util.concurrent.SettableFuture;
 import org.mockito.Mockito;
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index 44f949bf34..edb728b8ad 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -76,6 +76,28 @@
             <groupId>io.airlift</groupId>
             <artifactId>slice</artifactId>
         </dependency>
+        <!-- for mocked test-->
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>4.0.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 8828393a68..fee3ed2abe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -68,6 +69,16 @@ public class BinaryColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.TEXT;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.BINARY_ARRAY;
+  }
+
   @Override
   public Binary getBinary(int position) {
     checkReadablePosition(position);
@@ -80,6 +91,11 @@ public class BinaryColumn implements Column {
     return new TsPrimitiveType.TsBinary(getBinary(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index ca95163ea1..beb17a3ea5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -112,6 +113,11 @@ public class BinaryColumnBuilder implements ColumnBuilder {
     return new BinaryColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.TEXT;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     // TODO we need to sum up all the Binary's retainedSize here
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 54544d3650..fd736dac37 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -67,6 +68,16 @@ public class BooleanColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.BOOLEAN;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.BYTE_ARRAY;
+  }
+
   @Override
   public boolean getBoolean(int position) {
     checkReadablePosition(position);
@@ -79,6 +90,11 @@ public class BooleanColumn implements Column {
     return new TsPrimitiveType.TsBoolean(getBoolean(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
index da5760e7a1..74fedecd60 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -117,6 +118,11 @@ public class BooleanColumnBuilder implements ColumnBuilder {
     return new BooleanColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.BOOLEAN;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index adc06a1f52..000caba418 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -18,11 +18,18 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public interface Column {
 
+  /** Get the data type. */
+  TSDataType getDataType();
+
+  /** Get the encoding for this column. */
+  ColumnEncoding getEncoding();
+
   /** Gets a boolean at {@code position}. */
   default boolean getBoolean(int position) {
     throw new UnsupportedOperationException(getClass().getName());
@@ -58,6 +65,12 @@ public interface Column {
     throw new UnsupportedOperationException(getClass().getName());
   }
 
+  /**
+   * Is it possible the column may have a null value? If false, the column cannot contain a null,
+   * but if true, the column may or may not have a null.
+   */
+  boolean mayHaveNull();
+
   /**
    * Is the specified position null?
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
index a16e5fafee..37efd66949 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -67,6 +68,9 @@ public interface ColumnBuilder {
   /** Builds the block. This method can be called multiple times. */
   Column build();
 
+  /** Get the data type. */
+  TSDataType getDataType();
+
   /**
    * Returns the retained size of this column in memory, including over-allocations. This method is
    * called from the inner most execution loop and must be fast.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoder.java
new file mode 100644
index 0000000000..5520437eff
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface ColumnEncoder {
+
+  /** Read a column from the specified input. */
+  void readColumn(ColumnBuilder columnBuilder, ByteBuffer input, int positionCount);
+
+  /** Write the specified column to the specified output */
+  void writeColumn(DataOutputStream output, Column column) throws IOException;
+
+  static void serializeNullIndicators(DataOutputStream output, Column column) throws IOException {
+    boolean mayHaveNull = column.mayHaveNull();
+    output.writeBoolean(mayHaveNull);
+    if (!mayHaveNull) {
+      return;
+    }
+
+    int positionCount = column.getPositionCount();
+    byte[] packedIsNull = new byte[((positionCount & ~0b111) + 1) / 8];
+    int currentByte = 0;
+
+    for (int position = 0; position < (positionCount & ~0b111); position += 8, currentByte++) {
+      byte value = 0;
+      value |= column.isNull(position) ? 0b1000_0000 : 0;
+      value |= column.isNull(position + 1) ? 0b0100_0000 : 0;
+      value |= column.isNull(position + 2) ? 0b0010_0000 : 0;
+      value |= column.isNull(position + 3) ? 0b0001_0000 : 0;
+      value |= column.isNull(position + 4) ? 0b0000_1000 : 0;
+      value |= column.isNull(position + 5) ? 0b0000_0100 : 0;
+      value |= column.isNull(position + 6) ? 0b0000_0010 : 0;
+      value |= column.isNull(position + 7) ? 0b0000_0001 : 0;
+      packedIsNull[currentByte] = value;
+    }
+
+    output.write(packedIsNull);
+
+    // write last null bits
+    if ((positionCount & 0b111) > 0) {
+      byte value = 0;
+      int mask = 0b1000_0000;
+      for (int position = positionCount & ~0b111; position < positionCount; position++) {
+        value |= column.isNull(position) ? mask : 0;
+        mask >>>= 1;
+      }
+      output.write(value);
+    }
+  }
+
+  static boolean[] deserializeNullIndicators(ByteBuffer input, int positionCount) {
+    boolean mayHaveNull = input.get() != 0;
+    if (!mayHaveNull) {
+      return null;
+    }
+
+    byte[] packedIsNull = new byte[(positionCount + 7) / 8];
+    input.get(packedIsNull);
+
+    // read null bits 8 at a time
+    boolean[] valueIsNull = new boolean[positionCount];
+    int currentByte = 0;
+    for (int position = 0; position < (positionCount & ~0b111); position += 8, currentByte++) {
+      byte value = packedIsNull[currentByte];
+      valueIsNull[position] = ((value & 0b1000_0000) != 0);
+      valueIsNull[position + 1] = ((value & 0b0100_0000) != 0);
+      valueIsNull[position + 2] = ((value & 0b0010_0000) != 0);
+      valueIsNull[position + 3] = ((value & 0b0001_0000) != 0);
+      valueIsNull[position + 4] = ((value & 0b0000_1000) != 0);
+      valueIsNull[position + 5] = ((value & 0b0000_0100) != 0);
+      valueIsNull[position + 6] = ((value & 0b0000_0010) != 0);
+      valueIsNull[position + 7] = ((value & 0b0000_0001) != 0);
+    }
+
+    // read last null bits
+    if ((positionCount & 0b111) > 0) {
+      byte value = packedIsNull[packedIsNull.length - 1];
+      int mask = 0b1000_0000;
+      for (int position = positionCount & ~0b111; position < positionCount; position++) {
+        valueIsNull[position] = ((value & mask) != 0);
+        mask >>>= 1;
+      }
+    }
+
+    return valueIsNull;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerde.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoderFactory.java
similarity index 54%
rename from server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerde.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoderFactory.java
index 8fd0329772..e45af001d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/TsBlockSerde.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoderFactory.java
@@ -17,20 +17,24 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.buffer;
+package org.apache.iotdb.tsfile.read.common.block.column;
 
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import java.util.HashMap;
+import java.util.Map;
 
-import java.nio.ByteBuffer;
+public class ColumnEncoderFactory {
 
-public class TsBlockSerde {
-  public ByteBuffer serialized(TsBlock tsBlock) {
-    // TODO: implement
-    return null;
+  private static Map<ColumnEncoding, ColumnEncoder> encodingToEncoder = new HashMap<>();
+
+  static {
+    encodingToEncoder.put(ColumnEncoding.INT32_ARRAY, new Int32ArrayColumnEncoder());
+    encodingToEncoder.put(ColumnEncoding.INT64_ARRAY, new Int64ArrayColumnEncoder());
   }
 
-  public TsBlock deserialize(ByteBuffer buffer) {
-    // TODO: implement
-    return null;
+  public static ColumnEncoder get(ColumnEncoding columnEncoding) {
+    if (!encodingToEncoder.containsKey(columnEncoding)) {
+      throw new IllegalArgumentException("Unsupported column encoding: " + columnEncoding);
+    }
+    return encodingToEncoder.get(columnEncoding);
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoding.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoding.java
new file mode 100644
index 0000000000..0c7f2a70b8
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnEncoding.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public enum ColumnEncoding {
+  /** BOOLEAN. */
+  BYTE_ARRAY((byte) 0),
+  /** INT32, FLOAT */
+  INT32_ARRAY((byte) 1),
+  /** INT64, DOUBLE. */
+  INT64_ARRAY((byte) 2),
+  /** TEXT. */
+  BINARY_ARRAY((byte) 3);
+
+  private final byte value;
+
+  ColumnEncoding(byte value) {
+    this.value = value;
+  }
+
+  public static ColumnEncoding deserializeFrom(ByteBuffer buffer) {
+    return getColumnEncoding(buffer.get());
+  }
+
+  public void serializeTo(DataOutputStream stream) throws IOException {
+    stream.writeByte(value);
+  }
+
+  private static ColumnEncoding getColumnEncoding(byte value) {
+    switch (value) {
+      case 0:
+        return BYTE_ARRAY;
+      case 1:
+        return INT32_ARRAY;
+      case 2:
+        return INT64_ARRAY;
+      case 3:
+        return BINARY_ARRAY;
+      default:
+        throw new IllegalArgumentException("Invalid value: " + value);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 32809b02f6..cf3a35282b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -67,6 +68,16 @@ public class DoubleColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.DOUBLE;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.INT64_ARRAY;
+  }
+
   @Override
   public double getDouble(int position) {
     checkReadablePosition(position);
@@ -79,6 +90,11 @@ public class DoubleColumn implements Column {
     return new TsPrimitiveType.TsDouble(getDouble(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
index cdfcc35fc6..bff93c61df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -117,6 +118,11 @@ public class DoubleColumnBuilder implements ColumnBuilder {
     return new DoubleColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.DOUBLE;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 51a2675dae..bb27a22f86 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -66,6 +67,16 @@ public class FloatColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.FLOAT;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.INT32_ARRAY;
+  }
+
   @Override
   public float getFloat(int position) {
     checkReadablePosition(position);
@@ -78,6 +89,11 @@ public class FloatColumn implements Column {
     return new TsPrimitiveType.TsFloat(getFloat(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
index 142e711ffd..0061baa03e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -117,6 +118,11 @@ public class FloatColumnBuilder implements ColumnBuilder {
     return new FloatColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.FLOAT;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java
new file mode 100644
index 0000000000..4e0bfed482
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
+
+public class Int32ArrayColumnEncoder implements ColumnEncoder {
+
+  @Override
+  public void readColumn(ColumnBuilder columnBuilder, ByteBuffer input, int positionCount) {
+
+    // Serialized data layout:
+    //    +---------------+-----------------+-------------+
+    //    | may have null | null indicators |   values    |
+    //    +---------------+-----------------+-------------+
+    //    | byte          | list[byte]      | list[int32] |
+    //    +---------------+-----------------+-------------+
+
+    boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount);
+
+    TSDataType dataType = columnBuilder.getDataType();
+    if (INT32.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (nullIndicators == null || !nullIndicators[i]) {
+          columnBuilder.writeInt(input.getInt());
+        } else {
+          columnBuilder.appendNull();
+        }
+      }
+    } else if (FLOAT.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (nullIndicators == null || !nullIndicators[i]) {
+          columnBuilder.writeFloat(Float.intBitsToFloat(input.getInt()));
+        } else {
+          columnBuilder.appendNull();
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("Invalid data type: " + dataType);
+    }
+  }
+
+  @Override
+  public void writeColumn(DataOutputStream output, Column column) throws IOException {
+
+    ColumnEncoder.serializeNullIndicators(output, column);
+
+    TSDataType dataType = column.getDataType();
+    int positionCount = column.getPositionCount();
+    if (INT32.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!column.isNull(i)) {
+          output.writeInt(column.getInt(i));
+        }
+      }
+    } else if (FLOAT.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!column.isNull(i)) {
+          output.writeInt(Float.floatToIntBits(column.getFloat(i)));
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("Invalid data type: " + dataType);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java
new file mode 100644
index 0000000000..bfff7aed9d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64;
+
+public class Int64ArrayColumnEncoder implements ColumnEncoder {
+
+  @Override
+  public void readColumn(ColumnBuilder columnBuilder, ByteBuffer input, int positionCount) {
+
+    // Serialized data layout:
+    //    +---------------+-----------------+-------------+
+    //    | may have null | null indicators |   values    |
+    //    +---------------+-----------------+-------------+
+    //    | byte          | list[byte]      | list[int64] |
+    //    +---------------+-----------------+-------------+
+
+    boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount);
+
+    TSDataType dataType = columnBuilder.getDataType();
+    if (INT64.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (nullIndicators == null || !nullIndicators[i]) {
+          columnBuilder.writeLong(input.getLong());
+        } else {
+          columnBuilder.appendNull();
+        }
+      }
+    } else if (DOUBLE.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (nullIndicators == null || !nullIndicators[i]) {
+          columnBuilder.writeDouble(Double.longBitsToDouble(input.getLong()));
+        } else {
+          columnBuilder.appendNull();
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("Invalid data type: " + dataType);
+    }
+  }
+
+  @Override
+  public void writeColumn(DataOutputStream output, Column column) throws IOException {
+
+    ColumnEncoder.serializeNullIndicators(output, column);
+
+    TSDataType dataType = column.getDataType();
+    int positionCount = column.getPositionCount();
+    if (INT64.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!column.isNull(i)) {
+          output.writeLong(column.getLong(i));
+        }
+      }
+    } else if (DOUBLE.equals(dataType)) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!column.isNull(i)) {
+          output.writeLong(Double.doubleToLongBits(column.getDouble(i)));
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("Invalid data type: " + dataType);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 0d48dd2133..ce577dfc69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -66,6 +67,16 @@ public class IntColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT32;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.INT32_ARRAY;
+  }
+
   @Override
   public int getInt(int position) {
     checkReadablePosition(position);
@@ -78,6 +89,11 @@ public class IntColumn implements Column {
     return new TsPrimitiveType.TsInt(getInt(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
index b9f8e670cc..3a5db5b773 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -117,6 +118,11 @@ public class IntColumnBuilder implements ColumnBuilder {
     return new IntColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT32;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 345e71d5bc..f6f86f6177 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -66,6 +67,16 @@ public class LongColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT64;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.INT64_ARRAY;
+  }
+
   @Override
   public long getLong(int position) {
     checkReadablePosition(position);
@@ -78,6 +89,11 @@ public class LongColumn implements Column {
     return new TsPrimitiveType.TsLong(getLong(position));
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return valueIsNull != null;
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
index 12443fec3c..38afe3d711 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -117,6 +118,11 @@ public class LongColumnBuilder implements ColumnBuilder {
     return new LongColumn(0, positionCount, hasNullValue ? valueIsNull : null, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT64;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 39775002dd..36ea1f4929 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -61,6 +62,16 @@ public class RunLengthEncodedColumn implements Column {
     return value;
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return value.getDataType();
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return value.getEncoding();
+  }
+
   @Override
   public boolean getBoolean(int position) {
     checkReadablePosition(position);
@@ -103,6 +114,11 @@ public class RunLengthEncodedColumn implements Column {
     return value.getTsPrimitiveType(position);
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return value.mayHaveNull();
+  }
+
   @Override
   public boolean isNull(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index c46fbd2ea3..32de9dcec9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
 import org.openjdk.jol.info.ClassLayout;
 
 import static io.airlift.slice.SizeOf.sizeOf;
@@ -56,12 +58,27 @@ public class TimeColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT64;
+  }
+
+  @Override
+  public ColumnEncoding getEncoding() {
+    return ColumnEncoding.INT64_ARRAY;
+  }
+
   @Override
   public long getLong(int position) {
     checkReadablePosition(position);
     return values[position + arrayOffset];
   }
 
+  @Override
+  public boolean mayHaveNull() {
+    return false;
+  }
+
   @Override
   public boolean isNull(int position) {
     return false;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
index 89267c5b25..7b5dee9368 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
 import org.openjdk.jol.info.ClassLayout;
 
 import java.util.Arrays;
@@ -79,6 +81,11 @@ public class TimeColumnBuilder implements ColumnBuilder {
     return new TimeColumn(0, positionCount, values);
   }
 
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.INT64;
+  }
+
   @Override
   public long getRetainedSizeInBytes() {
     return retainedSizeInBytes;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TsBlockSerde.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TsBlockSerde.java
new file mode 100644
index 0000000000..ca1b9f22c0
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TsBlockSerde.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsBlockSerde {
+
+  /**
+   * Deserialize a tsblock.
+   *
+   * @param byteBuffer serialized tsblock.
+   * @return Deserialized tsblock.
+   */
+  public TsBlock deserialize(ByteBuffer byteBuffer) {
+
+    // Serialized tsblock:
+    //    +-------------+---------------+---------+------------+-----------+----------+
+    //    | val col cnt | val col types | pos cnt | encodings  | time col  | val col  |
+    //    +-------------+---------------+---------+------------+-----------+----------+
+    //    | int32       | list[byte]    | int32   | list[byte] |  bytes    | byte     |
+    //    +-------------+---------------+---------+------------+-----------+----------+
+
+    // Value column count.
+    int valueColumnCount = byteBuffer.getInt();
+
+    // Value column data types.
+    List<TSDataType> valueColumnDataTypes = new ArrayList<>(valueColumnCount);
+    for (int i = 0; i < valueColumnCount; i++) {
+      valueColumnDataTypes.add(TSDataType.deserializeFrom(byteBuffer));
+    }
+
+    // Position count.
+    int positionCount = byteBuffer.getInt();
+
+    TsBlockBuilder builder = new TsBlockBuilder(positionCount, valueColumnDataTypes);
+    builder.declarePositions(positionCount);
+
+    // Column encodings.
+    List<ColumnEncoding> columnEncodings = new ArrayList<>(valueColumnCount + 1);
+    for (int i = 0; i < valueColumnCount + 1; i++) {
+      columnEncodings.add(ColumnEncoding.deserializeFrom(byteBuffer));
+    }
+
+    // Time column.
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    ColumnEncoderFactory.get(columnEncodings.get(0))
+        .readColumn(timeColumnBuilder, byteBuffer, positionCount);
+
+    for (int i = 0; i < valueColumnCount; i++) {
+      // Value column.
+      ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
+      ColumnEncoderFactory.get(columnEncodings.get(1 + i))
+          .readColumn(columnBuilder, byteBuffer, positionCount);
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Serialize a tsblock.
+   *
+   * @param tsBlock The tsblock to serialize.
+   * @return Serialized tsblock.
+   */
+  public ByteBuffer serialize(TsBlock tsBlock) throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+
+    // Value column count.
+    dataOutputStream.writeInt(tsBlock.getValueColumnCount());
+
+    // Value column data types.
+    for (int i = 0; i < tsBlock.getValueColumnCount(); i++) {
+      tsBlock.getColumn(i).getDataType().serializeTo(dataOutputStream);
+    }
+
+    // Position count.
+    dataOutputStream.writeInt(tsBlock.getPositionCount());
+
+    // Column encodings.
+    tsBlock.getTimeColumn().getEncoding().serializeTo(dataOutputStream);
+    for (int i = 0; i < tsBlock.getValueColumnCount(); i++) {
+      tsBlock.getColumn(i).getEncoding().serializeTo(dataOutputStream);
+    }
+
+    // Time column.
+    ColumnEncoder columnEncoder = ColumnEncoderFactory.get(tsBlock.getTimeColumn().getEncoding());
+    columnEncoder.writeColumn(dataOutputStream, tsBlock.getTimeColumn());
+
+    for (int i = 0; i < tsBlock.getValueColumnCount(); i++) {
+      // Value column.
+      columnEncoder = ColumnEncoderFactory.get(tsBlock.getColumn(i).getEncoding());
+      columnEncoder.writeColumn(dataOutputStream, tsBlock.getColumn(i));
+    }
+
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/ColumnEncoderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/ColumnEncoderTest.java
new file mode 100644
index 0000000000..bac561d776
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/ColumnEncoderTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnEncoder;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ColumnEncoderTest {
+  @Test
+  public void testSerializeNullIndicators() throws IOException {
+    // Construct a mock column with position count equals 7.
+    Column mockColumn = Mockito.mock(Column.class);
+    Mockito.doReturn(7).when(mockColumn).getPositionCount();
+    Mockito.doReturn(true).when(mockColumn).mayHaveNull();
+    Mockito.doAnswer(invocation -> (int) invocation.getArgument(0) % 2 == 0)
+        .when(mockColumn)
+        .isNull(Mockito.anyInt());
+    Mockito.doAnswer(
+            invocation ->
+                (int) invocation.getArgument(0) % 2 == 0 ? null : invocation.getArgument(0))
+        .when(mockColumn)
+        .getInt(Mockito.anyInt());
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
+    ColumnEncoder.serializeNullIndicators(output, mockColumn);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+    Assert.assertEquals(2, bytes.length);
+    Assert.assertEquals(1, bytes[0]);
+    Assert.assertEquals((byte) 0b1010_1010, bytes[1]);
+
+    // Change the position count to 8.
+    Mockito.doReturn(8).when(mockColumn).getPositionCount();
+    byteArrayOutputStream = new ByteArrayOutputStream();
+    output = new DataOutputStream(byteArrayOutputStream);
+    ColumnEncoder.serializeNullIndicators(output, mockColumn);
+    bytes = byteArrayOutputStream.toByteArray();
+    Assert.assertEquals(2, bytes.length);
+    Assert.assertEquals(1, bytes[0]);
+    Assert.assertEquals((byte) 0b1010_1010, bytes[1]);
+
+    // Change the position count to 15.
+    Mockito.doReturn(15).when(mockColumn).getPositionCount();
+    byteArrayOutputStream = new ByteArrayOutputStream();
+    output = new DataOutputStream(byteArrayOutputStream);
+    ColumnEncoder.serializeNullIndicators(output, mockColumn);
+    bytes = byteArrayOutputStream.toByteArray();
+    Assert.assertEquals(3, bytes.length);
+    Assert.assertEquals(1, bytes[0]);
+    Assert.assertEquals((byte) 0b1010_1010, bytes[1]);
+    Assert.assertEquals((byte) 0b1010_1010, bytes[1]);
+  }
+
+  @Test
+  public void testDeserializeNullIndicators() {
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[] {(byte) 1, (byte) 0b1010_1010});
+    boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(buffer, 7);
+    Assert.assertNotNull(nullIndicators);
+    Assert.assertEquals(7, nullIndicators.length);
+    for (int i = 0; i < nullIndicators.length; i++) {
+      if (i % 2 == 0) {
+        Assert.assertTrue(nullIndicators[i]);
+      } else {
+        Assert.assertFalse(nullIndicators[i]);
+      }
+    }
+
+    buffer = ByteBuffer.wrap(new byte[] {(byte) 1, (byte) 0b1010_1010});
+    nullIndicators = ColumnEncoder.deserializeNullIndicators(buffer, 8);
+    Assert.assertNotNull(nullIndicators);
+    Assert.assertEquals(8, nullIndicators.length);
+    for (int i = 0; i < nullIndicators.length; i++) {
+      if (i % 2 == 0) {
+        Assert.assertTrue(nullIndicators[i]);
+      } else {
+        Assert.assertFalse(nullIndicators[i]);
+      }
+    }
+
+    buffer = ByteBuffer.wrap(new byte[] {(byte) 1, (byte) 0b1010_1010, (byte) 0b1010_1010});
+    nullIndicators = ColumnEncoder.deserializeNullIndicators(buffer, 15);
+    Assert.assertNotNull(nullIndicators);
+    Assert.assertEquals(15, nullIndicators.length);
+    for (int i = 0; i < nullIndicators.length; i++) {
+      if (i % 2 == 0) {
+        Assert.assertTrue(nullIndicators[i]);
+      } else {
+        Assert.assertFalse(nullIndicators[i]);
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeNoNullIndicators() throws IOException {
+    // Mock int32 column with position count == 8.
+    Column mockColumn = Mockito.mock(Column.class);
+    Mockito.doReturn(8).when(mockColumn).getPositionCount();
+    Mockito.doReturn(false).when(mockColumn).mayHaveNull();
+    Mockito.doReturn(false).when(mockColumn).isNull(Mockito.anyInt());
+    Mockito.doAnswer(invocation -> invocation.getArgument(0))
+        .when(mockColumn)
+        .getInt(Mockito.anyInt());
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
+    ColumnEncoder.serializeNullIndicators(output, mockColumn);
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+    Assert.assertEquals(1, bytes.length);
+    Assert.assertEquals(0, bytes[0]);
+  }
+
+  @Test
+  public void testDeserializeNoNullIndicators() {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {0});
+    boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(byteBuffer, 8);
+    Assert.assertNull(nullIndicators);
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int32ArrayColumnEncoderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int32ArrayColumnEncoderTest.java
new file mode 100644
index 0000000000..9feaf9bbb2
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int32ArrayColumnEncoderTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Int32ArrayColumnEncoder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+public class Int32ArrayColumnEncoderTest {
+  @Test
+  public void testIntColumn() {
+    final int positionCount = 10;
+
+    boolean[] nullIndicators = new boolean[positionCount];
+    int[] values = new int[positionCount];
+    for (int i = 0; i < positionCount; i++) {
+      nullIndicators[i] = i % 2 == 0;
+      if (i % 2 != 0) {
+        values[i] = i;
+      }
+    }
+    IntColumn input = new IntColumn(positionCount, Optional.of(nullIndicators), values);
+    Int32ArrayColumnEncoder serde = new Int32ArrayColumnEncoder();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(byteArrayOutputStream);
+    try {
+      serde.writeColumn(dos, input);
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    IntColumnBuilder intColumnBuilder = new IntColumnBuilder(null, positionCount);
+    serde.readColumn(intColumnBuilder, buffer, positionCount);
+    IntColumn output = (IntColumn) intColumnBuilder.build();
+    Assert.assertEquals(positionCount, output.getPositionCount());
+    Assert.assertTrue(output.mayHaveNull());
+    for (int i = 0; i < positionCount; i++) {
+      Assert.assertEquals(i % 2 == 0, output.isNull(i));
+      if (i % 2 != 0) {
+        Assert.assertEquals(i, output.getInt(i));
+      }
+    }
+  }
+
+  @Test
+  public void testFloatColumn() {
+    final int positionCount = 10;
+
+    boolean[] nullIndicators = new boolean[positionCount];
+    float[] values = new float[positionCount];
+    for (int i = 0; i < positionCount; i++) {
+      nullIndicators[i] = i % 2 == 0;
+      if (i % 2 != 0) {
+        values[i] = i + i / 10F;
+      }
+    }
+    FloatColumn input = new FloatColumn(positionCount, Optional.of(nullIndicators), values);
+    Int32ArrayColumnEncoder serde = new Int32ArrayColumnEncoder();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(byteArrayOutputStream);
+    try {
+      serde.writeColumn(dos, input);
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    FloatColumnBuilder floatColumnBuilder = new FloatColumnBuilder(null, positionCount);
+    serde.readColumn(floatColumnBuilder, buffer, positionCount);
+    FloatColumn output = (FloatColumn) floatColumnBuilder.build();
+    Assert.assertEquals(positionCount, output.getPositionCount());
+    Assert.assertTrue(output.mayHaveNull());
+    for (int i = 0; i < positionCount; i++) {
+      Assert.assertEquals(i % 2 == 0, output.isNull(i));
+      if (i % 2 != 0) {
+        Assert.assertEquals(i + i / 10F, output.getFloat(i), 0.001F);
+      }
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int64ArrayColumnEncoderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int64ArrayColumnEncoderTest.java
new file mode 100644
index 0000000000..e094205dfb
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/Int64ArrayColumnEncoderTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.common.block;
+
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Int64ArrayColumnEncoder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+public class Int64ArrayColumnEncoderTest {
+
+  @Test
+  public void testLongColumn() {
+    final int positionCount = 10;
+
+    boolean[] nullIndicators = new boolean[positionCount];
+    long[] values = new long[positionCount];
+    for (int i = 0; i < positionCount; i++) {
+      nullIndicators[i] = i % 2 == 0;
+      if (i % 2 != 0) {
+        values[i] = i;
+      }
+    }
+    LongColumn input = new LongColumn(positionCount, Optional.of(nullIndicators), values);
+    Int64ArrayColumnEncoder serde = new Int64ArrayColumnEncoder();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(byteArrayOutputStream);
+    try {
+      serde.writeColumn(dos, input);
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, positionCount);
+    serde.readColumn(longColumnBuilder, buffer, positionCount);
+    LongColumn output = (LongColumn) longColumnBuilder.build();
+    Assert.assertEquals(positionCount, output.getPositionCount());
+    Assert.assertTrue(output.mayHaveNull());
+    for (int i = 0; i < positionCount; i++) {
+      Assert.assertEquals(i % 2 == 0, output.isNull(i));
+      if (i % 2 != 0) {
+        Assert.assertEquals(i, output.getLong(i));
+      }
+    }
+  }
+
+  @Test
+  public void testDoubleColumn() {
+    final int positionCount = 10;
+
+    boolean[] nullIndicators = new boolean[positionCount];
+    double[] values = new double[positionCount];
+    for (int i = 0; i < positionCount; i++) {
+      nullIndicators[i] = i % 2 == 0;
+      if (i % 2 != 0) {
+        values[i] = i + i / 10D;
+      }
+    }
+    DoubleColumn input = new DoubleColumn(positionCount, Optional.of(nullIndicators), values);
+    Int64ArrayColumnEncoder serde = new Int64ArrayColumnEncoder();
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(byteArrayOutputStream);
+    try {
+      serde.writeColumn(dos, input);
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    DoubleColumnBuilder doubleColumnBuilder = new DoubleColumnBuilder(null, positionCount);
+    serde.readColumn(doubleColumnBuilder, buffer, positionCount);
+    DoubleColumn output = (DoubleColumn) doubleColumnBuilder.build();
+    Assert.assertEquals(positionCount, output.getPositionCount());
+    Assert.assertTrue(output.mayHaveNull());
+    for (int i = 0; i < positionCount; i++) {
+      Assert.assertEquals(i % 2 == 0, output.isNull(i));
+      if (i % 2 != 0) {
+        Assert.assertEquals(i + i / 10D, output.getDouble(i), 0.001D);
+      }
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java
new file mode 100644
index 0000000000..40c10c8358
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.common.block;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnEncoding;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsBlockSerdeTest {
+  @Test
+  public void testSerializeAndDeserialize() {
+    final int positionCount = 10;
+
+    // TODO: test more data types
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.INT32);
+    dataTypes.add(TSDataType.FLOAT);
+    dataTypes.add(TSDataType.INT64);
+    dataTypes.add(TSDataType.DOUBLE);
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    ColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder intColumnBuilder = tsBlockBuilder.getColumnBuilder(0);
+    ColumnBuilder floatColumnBuilder = tsBlockBuilder.getColumnBuilder(1);
+    ColumnBuilder longColumnBuilder = tsBlockBuilder.getColumnBuilder(2);
+    ColumnBuilder doubleColumnBuilder = tsBlockBuilder.getColumnBuilder(3);
+    for (int i = 0; i < positionCount; i++) {
+      timeColumnBuilder.writeLong(i);
+      intColumnBuilder.writeInt(i);
+      floatColumnBuilder.writeFloat(i + i / 10F);
+      longColumnBuilder.writeLong(i);
+      doubleColumnBuilder.writeDouble(i + i / 10D);
+      tsBlockBuilder.declarePosition();
+    }
+
+    TsBlockSerde tsBlockSerde = new TsBlockSerde();
+    try {
+      ByteBuffer output = tsBlockSerde.serialize(tsBlockBuilder.build());
+      output.rewind();
+      int valueColumnCount = output.getInt();
+      Assert.assertEquals(4, valueColumnCount);
+      Assert.assertEquals(TSDataType.INT32, TSDataType.deserialize(output.get()));
+      Assert.assertEquals(TSDataType.FLOAT, TSDataType.deserialize(output.get()));
+      Assert.assertEquals(TSDataType.INT64, TSDataType.deserialize(output.get()));
+      Assert.assertEquals(TSDataType.DOUBLE, TSDataType.deserialize(output.get()));
+      Assert.assertEquals(positionCount, output.getInt());
+      Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output));
+      Assert.assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output));
+      Assert.assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output));
+      Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output));
+      Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output));
+
+      output.rewind();
+      TsBlock tsBlock = tsBlockSerde.deserialize(output);
+      Assert.assertEquals(valueColumnCount, tsBlock.getValueColumnCount());
+      Assert.assertEquals(TSDataType.INT32, tsBlock.getColumn(0).getDataType());
+      Assert.assertEquals(TSDataType.FLOAT, tsBlock.getColumn(1).getDataType());
+      Assert.assertEquals(TSDataType.INT64, tsBlock.getColumn(2).getDataType());
+      Assert.assertEquals(TSDataType.DOUBLE, tsBlock.getColumn(3).getDataType());
+      Assert.assertEquals(positionCount, tsBlock.getPositionCount());
+      Assert.assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(0).getEncoding());
+      Assert.assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(1).getEncoding());
+      Assert.assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(2).getEncoding());
+      Assert.assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(3).getEncoding());
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+}