You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 01:10:35 UTC

[iotdb] branch Vector updated: [To Vector] Support vector write in tsfile (#2799)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch Vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/Vector by this push:
     new 74cd5fb  [To Vector] Support vector write in tsfile (#2799)
74cd5fb is described below

commit 74cd5fb95f8eedec1838d89b3595580e4ece9b84
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Mar 11 09:10:14 2021 +0800

    [To Vector] Support vector write in tsfile (#2799)
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  12 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  12 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  27 +-
 .../iotdb/tsfile/file/header/ChunkHeader.java      |  15 +-
 .../tsfile/file/metadata/enums/TSDataType.java     |   5 +-
 .../file/metadata/statistics/Statistics.java       |  94 ++-----
 .../file/metadata/statistics/TimeStatistics.java   | 161 ++++++++++++
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   4 +
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |  60 +++++
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  24 +-
 .../iotdb/tsfile/write/chunk/IChunkWriter.java     |  15 +-
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  | 255 ++++++++++++++++++
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++
 .../tsfile/write/chunk/VectorChunkWriterImpl.java  | 202 ++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |   1 +
 .../iotdb/tsfile/write/page/TimePageWriter.java    | 177 +++++++++++++
 .../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++-----
 .../write/record/datapoint/BooleanDataPoint.java   |   2 +-
 .../write/record/datapoint/DoubleDataPoint.java    |   2 +-
 .../write/record/datapoint/FloatDataPoint.java     |   2 +-
 .../write/record/datapoint/IntDataPoint.java       |   2 +-
 .../write/record/datapoint/LongDataPoint.java      |   2 +-
 .../write/record/datapoint/StringDataPoint.java    |   2 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |  47 ++++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  22 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   3 +-
 .../write/writer/RestorableTsFileIOWriterTest.java |   5 +-
 .../tsfile/write/writer/TestTsFileOutput.java      |  52 ++++
 .../tsfile/write/writer/TimeChunkWriterTest.java   | 111 ++++++++
 .../tsfile/write/writer/TimePageWriterTest.java    | 171 ++++++++++++
 .../tsfile/write/writer/ValueChunkWriterTest.java  | 109 ++++++++
 .../tsfile/write/writer/ValuePageWriterTest.java   | 291 +++++++++++++++++++++
 .../write/writer/VectorChunkWriterImplTest.java    | 178 +++++++++++++
 .../write/writer/VectorMeasurementSchemaStub.java  |  80 ++++++
 34 files changed, 2355 insertions(+), 202 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..caa0893 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -173,22 +173,22 @@ public class MemTableFlushTask {
 
             switch (dataType) {
               case BOOLEAN:
-                seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+                seriesWriterImpl.write(time, tvPairs.getBoolean(i), false);
                 break;
               case INT32:
-                seriesWriterImpl.write(time, tvPairs.getInt(i));
+                seriesWriterImpl.write(time, tvPairs.getInt(i), false);
                 break;
               case INT64:
-                seriesWriterImpl.write(time, tvPairs.getLong(i));
+                seriesWriterImpl.write(time, tvPairs.getLong(i), false);
                 break;
               case FLOAT:
-                seriesWriterImpl.write(time, tvPairs.getFloat(i));
+                seriesWriterImpl.write(time, tvPairs.getFloat(i), false);
                 break;
               case DOUBLE:
-                seriesWriterImpl.write(time, tvPairs.getDouble(i));
+                seriesWriterImpl.write(time, tvPairs.getDouble(i), false);
                 break;
               case TEXT:
-                seriesWriterImpl.write(time, tvPairs.getBinary(i));
+                seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
                 break;
               default:
                 LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index b7e0842..b6e2316 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -412,22 +412,22 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
       getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
       switch (schema.getType()) {
         case INT32:
-          chunkWriter.write(time, (int) value);
+          chunkWriter.write(time, (int) value, false);
           break;
         case INT64:
-          chunkWriter.write(time, (long) value);
+          chunkWriter.write(time, (long) value, false);
           break;
         case FLOAT:
-          chunkWriter.write(time, (float) value);
+          chunkWriter.write(time, (float) value, false);
           break;
         case DOUBLE:
-          chunkWriter.write(time, (double) value);
+          chunkWriter.write(time, (double) value, false);
           break;
         case BOOLEAN:
-          chunkWriter.write(time, (boolean) value);
+          chunkWriter.write(time, (boolean) value, false);
           break;
         case TEXT:
-          chunkWriter.write(time, (Binary) value);
+          chunkWriter.write(time, (Binary) value, false);
           break;
         default:
           throw new UnSupportedDataTypeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 973cb24..2dbaca3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -53,22 +53,25 @@ public class MergeUtils {
   public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) {
     switch (chunkWriter.getDataType()) {
       case TEXT:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+        chunkWriter.write(
+            timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary(), false);
         break;
       case DOUBLE:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
+        chunkWriter.write(
+            timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble(), false);
         break;
       case BOOLEAN:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+        chunkWriter.write(
+            timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean(), false);
         break;
       case INT64:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong());
+        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong(), false);
         break;
       case INT32:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt());
+        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt(), false);
         break;
       case FLOAT:
-        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat());
+        chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat(), false);
         break;
       default:
         throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
@@ -109,22 +112,22 @@ public class MergeUtils {
   public static void writeBatchPoint(BatchData batchData, int i, IChunkWriter chunkWriter) {
     switch (chunkWriter.getDataType()) {
       case TEXT:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBinaryByIndex(i), false);
         break;
       case DOUBLE:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), false);
         break;
       case BOOLEAN:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getBooleanByIndex(i), false);
         break;
       case INT64:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getLongByIndex(i), false);
         break;
       case INT32:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getIntByIndex(i), false);
         break;
       case FLOAT:
-        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i));
+        chunkWriter.write(batchData.getTimeByIndex(i), batchData.getFloatByIndex(i), false);
         break;
       default:
         throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 663da60..48c9134 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -58,8 +58,21 @@ public class ChunkHeader {
       CompressionType compressionType,
       TSEncoding encoding,
       int numOfPages) {
+    this(measurementID, dataSize, dataType, compressionType, encoding, numOfPages, 0);
+  }
+
+  public ChunkHeader(
+      String measurementID,
+      int dataSize,
+      TSDataType dataType,
+      CompressionType compressionType,
+      TSEncoding encoding,
+      int numOfPages,
+      int mask) {
     this(
-        numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER,
+        (byte)
+            ((numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER)
+                | mask),
         measurementID,
         dataSize,
         getSerializedSize(measurementID, dataSize),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 0edda45..a2b7eef 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -41,7 +41,10 @@ public enum TSDataType {
   DOUBLE((byte) 4),
 
   /** TEXT */
-  TEXT((byte) 5);
+  TEXT((byte) 5),
+
+  /** Vector */
+  Vector((byte) 6);
 
   private final byte type;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 1ec2001..1c1ba7f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -77,6 +77,8 @@ public abstract class Statistics<T> {
         return new DoubleStatistics();
       case FLOAT:
         return new FloatStatistics();
+      case Vector:
+        return new TimeStatistics();
       default:
         throw new UnknownColumnTypeException(type.toString());
     }
@@ -189,61 +191,36 @@ public abstract class Statistics<T> {
   }
 
   public void update(long time, boolean value) {
-    if (time < this.startTime) {
-      startTime = time;
-    }
-    if (time > this.endTime) {
-      endTime = time;
-    }
-    count++;
+    update(time);
     updateStats(value);
   }
 
   public void update(long time, int value) {
-    if (time < this.startTime) {
-      startTime = time;
-    }
-    if (time > this.endTime) {
-      endTime = time;
-    }
-    count++;
+    update(time);
     updateStats(value);
   }
 
   public void update(long time, long value) {
-    if (time < this.startTime) {
-      startTime = time;
-    }
-    if (time > this.endTime) {
-      endTime = time;
-    }
-    count++;
+    update(time);
     updateStats(value);
   }
 
   public void update(long time, float value) {
-    if (time < this.startTime) {
-      startTime = time;
-    }
-    if (time > this.endTime) {
-      endTime = time;
-    }
-    count++;
+    update(time);
     updateStats(value);
   }
 
   public void update(long time, double value) {
-    if (time < this.startTime) {
-      startTime = time;
-    }
-    if (time > this.endTime) {
-      endTime = time;
-    }
-    count++;
+    update(time);
     updateStats(value);
   }
 
   public void update(long time, Binary value) {
+    update(time);
+    updateStats(value);
+  }
+
+  public void update(long time) {
     if (time < startTime) {
       startTime = time;
     }
@@ -251,65 +228,39 @@ public abstract class Statistics<T> {
       endTime = time;
     }
     count++;
-    updateStats(value);
   }
 
   public void update(long[] time, boolean[] values, int batchSize) {
-    if (time[0] < startTime) {
-      startTime = time[0];
-    }
-    if (time[batchSize - 1] > this.endTime) {
-      endTime = time[batchSize - 1];
-    }
-    count += batchSize;
+    update(time, batchSize);
     updateStats(values, batchSize);
   }
 
   public void update(long[] time, int[] values, int batchSize) {
-    if (time[0] < startTime) {
-      startTime = time[0];
-    }
-    if (time[batchSize - 1] > this.endTime) {
-      endTime = time[batchSize - 1];
-    }
-    count += batchSize;
+    update(time, batchSize);
     updateStats(values, batchSize);
   }
 
   public void update(long[] time, long[] values, int batchSize) {
-    if (time[0] < startTime) {
-      startTime = time[0];
-    }
-    if (time[batchSize - 1] > this.endTime) {
-      endTime = time[batchSize - 1];
-    }
-    count += batchSize;
+    update(time, batchSize);
     updateStats(values, batchSize);
   }
 
   public void update(long[] time, float[] values, int batchSize) {
-    if (time[0] < startTime) {
-      startTime = time[0];
-    }
-    if (time[batchSize - 1] > this.endTime) {
-      endTime = time[batchSize - 1];
-    }
-    count += batchSize;
+    update(time, batchSize);
     updateStats(values, batchSize);
   }
 
   public void update(long[] time, double[] values, int batchSize) {
-    if (time[0] < startTime) {
-      startTime = time[0];
-    }
-    if (time[batchSize - 1] > this.endTime) {
-      endTime = time[batchSize - 1];
-    }
-    count += batchSize;
+    update(time, batchSize);
     updateStats(values, batchSize);
   }
 
   public void update(long[] time, Binary[] values, int batchSize) {
+    update(time, batchSize);
+    updateStats(values, batchSize);
+  }
+
+  public void update(long[] time, int batchSize) {
     if (time[0] < startTime) {
       startTime = time[0];
     }
@@ -317,7 +268,6 @@ public abstract class Statistics<T> {
       endTime = time[batchSize - 1];
     }
     count += batchSize;
-    updateStats(values, batchSize);
   }
 
   protected abstract void mergeStatisticsValue(Statistics stats);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
new file mode 100644
index 0000000..74bd701
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata.statistics;
+
+import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class TimeStatistics extends Statistics {
+
+  static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40;
+
+  @Override
+  public TSDataType getType() {
+    return TSDataType.Vector;
+  }
+
+  @Override
+  public int getStatsSize() {
+    return 0;
+  }
+
+  @Override
+  public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+    throw new StatisticsClassException("Time statistics does not support: set min max from bytes");
+  }
+
+  @Override
+  public Long getMinValue() {
+    throw new StatisticsClassException("Time statistics does not support: min value");
+  }
+
+  @Override
+  public Long getMaxValue() {
+    throw new StatisticsClassException("Time statistics does not support: max value");
+  }
+
+  @Override
+  public Long getFirstValue() {
+    throw new StatisticsClassException("Time statistics does not support: first value");
+  }
+
+  @Override
+  public Long getLastValue() {
+    throw new StatisticsClassException("Time statistics does not support: last value");
+  }
+
+  @Override
+  public double getSumDoubleValue() {
+    throw new StatisticsClassException("Time statistics does not support: double sum");
+  }
+
+  @Override
+  public long getSumLongValue() {
+    throw new StatisticsClassException("Time statistics does not support: long sum");
+  }
+
+  @Override
+  void updateStats(long value) {
+    throw new StatisticsClassException("Time statistics does not support: update stats");
+  }
+
+  @Override
+  void updateStats(long[] values, int batchSize) {
+    throw new StatisticsClassException("Time statistics does not support: update stats");
+  }
+
+  @Override
+  public void updateStats(long minValue, long maxValue) {
+    throw new StatisticsClassException("Time statistics does not support: update stats");
+  }
+
+  @Override
+  public long calculateRamSize() {
+    return TIME_STATISTICS_FIXED_RAM_SIZE;
+  }
+
+  @Override
+  protected void mergeStatisticsValue(Statistics stats) {}
+
+  @Override
+  public byte[] getMinValueBytes() {
+    throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+  }
+
+  @Override
+  public byte[] getMaxValueBytes() {
+    throw new StatisticsClassException("Time statistics does not support: get max value bytes");
+  }
+
+  @Override
+  public byte[] getFirstValueBytes() {
+    throw new StatisticsClassException("Time statistics does not support: get first value bytes");
+  }
+
+  @Override
+  public byte[] getLastValueBytes() {
+    throw new StatisticsClassException("Time statistics does not support: get last value bytes");
+  }
+
+  @Override
+  public byte[] getSumValueBytes() {
+    throw new StatisticsClassException("Time statistics does not support: get sum value bytes");
+  }
+
+  @Override
+  public ByteBuffer getMinValueBuffer() {
+    throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+  }
+
+  @Override
+  public ByteBuffer getMaxValueBuffer() {
+    throw new StatisticsClassException("Time statistics does not support: get max value buffer");
+  }
+
+  @Override
+  public ByteBuffer getFirstValueBuffer() {
+    throw new StatisticsClassException("Time statistics does not support: get first value buffer");
+  }
+
+  @Override
+  public ByteBuffer getLastValueBuffer() {
+    throw new StatisticsClassException("Time statistics does not support: get last value buffer");
+  }
+
+  @Override
+  public ByteBuffer getSumValueBuffer() {
+    throw new StatisticsClassException("Time statistics does not support: get sum value buffer");
+  }
+
+  @Override
+  public int serializeStats(OutputStream outputStream) {
+    return 0;
+  }
+
+  @Override
+  public void deserialize(InputStream inputStream) throws IOException {}
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {}
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 881db03..61efe8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -86,4 +86,8 @@ public class PublicBAOS extends ByteArrayOutputStream {
   public int size() {
     return count;
   }
+
+  public void truncate(int size) {
+    count = size;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 73b01d2..6c00f43 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -45,6 +45,8 @@ public abstract class TsPrimitiveType implements Serializable {
         return new TsPrimitiveType.TsDouble((double) v);
       case TEXT:
         return new TsPrimitiveType.TsBinary((Binary) v);
+      case Vector:
+        return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v);
       default:
         throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
     }
@@ -74,6 +76,10 @@ public abstract class TsPrimitiveType implements Serializable {
     throw new UnsupportedOperationException("getBinary() is not supported for current sub-class");
   }
 
+  public TsPrimitiveType[] getVector() {
+    throw new UnsupportedOperationException("setDouble() is not supported for current sub-class");
+  }
+
   public void setBoolean(boolean val) {
     throw new UnsupportedOperationException("setBoolean() is not supported for current sub-class");
   }
@@ -98,6 +104,10 @@ public abstract class TsPrimitiveType implements Serializable {
     throw new UnsupportedOperationException("setBinary() is not supported for current sub-class");
   }
 
+  public void setVector(TsPrimitiveType[] val) {
+    throw new UnsupportedOperationException("setDouble() is not supported for current sub-class");
+  }
+
   /**
    * get the size of one instance of current class.
    *
@@ -462,4 +472,54 @@ public abstract class TsPrimitiveType implements Serializable {
       return false;
     }
   }
+
+  public static class TsVector extends TsPrimitiveType {
+
+    private TsPrimitiveType[] value;
+
+    public TsVector(TsPrimitiveType[] value) {
+      this.value = value;
+    }
+
+    @Override
+    public TsPrimitiveType[] getVector() {
+      return value;
+    }
+
+    @Override
+    public void setVector(TsPrimitiveType[] val) {
+      this.value = val;
+    }
+
+    @Override
+    public int getSize() {
+      int size = 0;
+      for (TsPrimitiveType type : value) {
+        size += type.getSize();
+      }
+      // object header + array object header
+      return 4 + 4 + size;
+    }
+
+    @Override
+    public Object getValue() {
+      return getVector();
+    }
+
+    @Override
+    public String getStringValue() {
+      StringBuilder builder = new StringBuilder("[");
+      builder.append(value[0].getStringValue());
+      for (TsPrimitiveType type : value) {
+        builder.append(", ").append(type.getStringValue());
+      }
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    public TSDataType getDataType() {
+      return TSDataType.Vector;
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 2c00f4a..2beda20 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -145,7 +145,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long time, long value) {
+  public void write(long time, long value, boolean isNull) {
     // store last point for sdtEncoding, it still needs to go through encoding process
     // in case it exceeds compdev and needs to store second last point
     if (!isSdtEncoding || sdtEncoder.encodeLong(time, value)) {
@@ -160,7 +160,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long time, int value) {
+  public void write(long time, int value, boolean isNull) {
     if (!isSdtEncoding || sdtEncoder.encodeInt(time, value)) {
       pageWriter.write(
           isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -173,13 +173,13 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long time, boolean value) {
+  public void write(long time, boolean value, boolean isNull) {
     pageWriter.write(time, value);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long time, float value) {
+  public void write(long time, float value, boolean isNull) {
     if (!isSdtEncoding || sdtEncoder.encodeFloat(time, value)) {
       pageWriter.write(
           isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -193,7 +193,7 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long time, double value) {
+  public void write(long time, double value, boolean isNull) {
     if (!isSdtEncoding || sdtEncoder.encodeDouble(time, value)) {
       pageWriter.write(
           isSdtEncoding ? sdtEncoder.getTime() : time,
@@ -206,12 +206,17 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long time, Binary value) {
+  public void write(long time, Binary value, boolean isNull) {
     pageWriter.write(time, value);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
+  public void write(long time) {
+    throw new IllegalStateException("write time method is not implemented in common chunk writer");
+  }
+
+  @Override
   public void write(long[] timestamps, int[] values, int batchSize) {
     if (isSdtEncoding) {
       batchSize = sdtEncoder.encode(timestamps, values, batchSize);
@@ -412,7 +417,7 @@ public class ChunkWriterImpl implements IChunkWriter {
    * @param statistics the chunk statistics
    * @throws IOException exception in IO
    */
-  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics)
+  private void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer, Statistics<?> statistics)
       throws IOException {
     if (statistics.getCount() == 0) {
       return;
@@ -420,13 +425,14 @@ public class ChunkWriterImpl implements IChunkWriter {
 
     // start to write this column chunk
     writer.startFlushChunk(
-        measurementSchema,
+        measurementSchema.getMeasurementId(),
         compressor.getType(),
         measurementSchema.getType(),
         measurementSchema.getEncodingType(),
         statistics,
         pageBuffer.size(),
-        numOfPages);
+        numOfPages,
+        0);
 
     long dataOffset = writer.getPos();
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
index cab9615..963ef64 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
@@ -28,22 +28,25 @@ import java.io.IOException;
 public interface IChunkWriter {
 
   /** write a time value pair. */
-  void write(long time, int value);
+  void write(long time, int value, boolean isNull);
 
   /** write a time value pair. */
-  void write(long time, long value);
+  void write(long time, long value, boolean isNull);
 
   /** write a time value pair. */
-  void write(long time, boolean value);
+  void write(long time, boolean value, boolean isNull);
 
   /** write a time value pair. */
-  void write(long time, float value);
+  void write(long time, float value, boolean isNull);
 
   /** write a time value pair. */
-  void write(long time, double value);
+  void write(long time, double value, boolean isNull);
 
   /** write a time value pair. */
-  void write(long time, Binary value);
+  void write(long time, Binary value, boolean isNull);
+
+  /** write a time. */
+  void write(long time);
 
   /** write time series */
   void write(long[] timestamps, int[] values, int batchSize);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
new file mode 100644
index 0000000..df9ded9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TimeChunkWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
+
+  private final String measurementId;
+
+  private final TSEncoding encodingType;
+
+  private final CompressionType compressionType;
+
+  /** all pages of this chunk. */
+  private final PublicBAOS pageBuffer;
+
+  private int numOfPages;
+
+  /** write data into current page */
+  private TimePageWriter pageWriter;
+
+  /** page size threshold. */
+  private final long pageSizeThreshold;
+
+  private final int maxNumberOfPointsInPage;
+
+  /** value count in current page. */
+  private int valueCountInOnePageForNextCheck;
+
+  // initial value for valueCountInOnePageForNextCheck
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+
+  /** statistic of this chunk. */
+  private TimeStatistics statistics;
+
+  /** first page info */
+  private int sizeWithoutStatistic;
+
+  private Statistics<?> firstPageStatistics;
+
+  public TimeChunkWriter(
+      String measurementId,
+      CompressionType compressionType,
+      TSEncoding encodingType,
+      Encoder timeEncoder) {
+    this.measurementId = measurementId;
+    this.encodingType = encodingType;
+    this.compressionType = compressionType;
+    this.pageBuffer = new PublicBAOS();
+
+    this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxNumberOfPointsInPage =
+        TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    // initial check of memory usage. So that we have enough data to make an initial prediction
+    this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+    // init statistics for this chunk and page
+    this.statistics = new TimeStatistics();
+
+    this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+  }
+
+  public void write(long time) {
+    pageWriter.write(time);
+  }
+
+  public void write(long[] timestamps, int batchSize) {
+    pageWriter.write(timestamps, batchSize);
+  }
+
+  /**
+   * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+   * to pageBuffer
+   */
+  public boolean checkPageSizeAndMayOpenANewPage() {
+    if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+      logger.debug("current line count reaches the upper bound, write page {}", measurementId);
+      return true;
+    } else if (pageWriter.getPointNumber()
+        >= valueCountInOnePageForNextCheck) { // need to check memory size
+      // not checking the memory used for every value
+      long currentPageSize = pageWriter.estimateMaxMemSize();
+      if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+        // we will write the current page
+        logger.debug(
+            "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+            measurementId,
+            pageSizeThreshold,
+            currentPageSize,
+            pageWriter.getPointNumber());
+        valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+        return true;
+      } else {
+        // reset the valueCountInOnePageForNextCheck for the next page
+        valueCountInOnePageForNextCheck =
+            (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
+      }
+    }
+    return false;
+  }
+
+  public void writePageToPageBuffer() {
+    try {
+      if (numOfPages == 0) { // record the firstPageStatistics
+        this.firstPageStatistics = pageWriter.getStatistics();
+        this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        byte[] b = pageBuffer.toByteArray();
+        pageBuffer.reset();
+        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+        firstPageStatistics.serialize(pageBuffer);
+        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+        firstPageStatistics = null;
+      } else {
+        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+      }
+
+      // update statistics of this chunk
+      numOfPages++;
+      this.statistics.mergeStatistics(pageWriter.getStatistics());
+    } catch (IOException e) {
+      logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+    } finally {
+      // clear start time stamp for next initializing
+      pageWriter.reset();
+    }
+  }
+
+  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+    sealCurrentPage();
+    writeAllPagesOfChunkToTsFile(tsfileWriter);
+
+    // reinit this chunk writer
+    pageBuffer.reset();
+    numOfPages = 0;
+    firstPageStatistics = null;
+    this.statistics = new TimeStatistics();
+  }
+
+  public long estimateMaxSeriesMemSize() {
+    return pageBuffer.size()
+        + pageWriter.estimateMaxMemSize()
+        + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+        + pageWriter.getStatistics().getSerializedSize();
+  }
+
+  public long getCurrentChunkSize() {
+    if (pageBuffer.size() == 0) {
+      return 0;
+    }
+    // return the serialized size of the chunk header + all pages
+    return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+        + (long) pageBuffer.size();
+  }
+
+  public void sealCurrentPage() {
+    if (pageWriter != null && pageWriter.getPointNumber() > 0) {
+      writePageToPageBuffer();
+    }
+  }
+
+  public void clearPageWriter() {
+    pageWriter = null;
+  }
+
+  public int getNumOfPages() {
+    return numOfPages;
+  }
+
+  public TSDataType getDataType() {
+    return TSDataType.Vector;
+  }
+
+  /**
+   * write the page to specified IOWriter.
+   *
+   * @param writer the specified IOWriter
+   * @throws IOException exception in IO
+   */
+  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+    if (statistics.getCount() == 0) {
+      return;
+    }
+
+    // start to write this column chunk
+    writer.startFlushChunk(
+        measurementId,
+        compressionType,
+        TSDataType.Vector,
+        encodingType,
+        statistics,
+        pageBuffer.size(),
+        numOfPages,
+        0x80);
+
+    long dataOffset = writer.getPos();
+
+    // write all pages of this column
+    writer.writeBytesToStream(pageBuffer);
+
+    int dataSize = (int) (writer.getPos() - dataOffset);
+    if (dataSize != pageBuffer.size()) {
+      throw new IOException(
+          "Bytes written is inconsistent with the size of data: "
+              + dataSize
+              + " !="
+              + " "
+              + pageBuffer.size());
+    }
+
+    writer.endCurrentChunk();
+  }
+
+  /** only used for test */
+  public PublicBAOS getPageBuffer() {
+    return pageBuffer;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
new file mode 100644
index 0000000..17e137d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ValueChunkWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(ValueChunkWriter.class);
+
+  private final String measurementId;
+
+  private final TSEncoding encodingType;
+
+  private final TSDataType dataType;
+
+  private final CompressionType compressionType;
+
+  /** all pages of this chunk. */
+  private final PublicBAOS pageBuffer;
+
+  private int numOfPages;
+
+  /** write data into current page */
+  private ValuePageWriter pageWriter;
+
+  /** statistic of this chunk. */
+  private Statistics<?> statistics;
+
+  /** first page info */
+  private int sizeWithoutStatistic;
+
+  private Statistics<?> firstPageStatistics;
+
+  public ValueChunkWriter(
+      String measurementId,
+      CompressionType compressionType,
+      TSDataType dataType,
+      TSEncoding encodingType,
+      Encoder valueEncoder) {
+    this.measurementId = measurementId;
+    this.encodingType = encodingType;
+    this.dataType = dataType;
+    this.compressionType = compressionType;
+    this.pageBuffer = new PublicBAOS();
+
+    // init statistics for this chunk and page
+    this.statistics = Statistics.getStatsByType(dataType);
+
+    this.pageWriter =
+        new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
+  }
+
+  public void write(long time, long value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long time, int value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long time, boolean value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long time, float value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long time, double value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long time, Binary value, boolean isNull) {
+    pageWriter.write(time, value, isNull);
+  }
+
+  public void write(long[] timestamps, int[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void write(long[] timestamps, long[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void write(long[] timestamps, float[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void write(long[] timestamps, double[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+  }
+
+  public void writePageToPageBuffer() {
+    try {
+      if (numOfPages == 0) { // record the firstPageStatistics
+        this.firstPageStatistics = pageWriter.getStatistics();
+        this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+        byte[] b = pageBuffer.toByteArray();
+        pageBuffer.reset();
+        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+        firstPageStatistics.serialize(pageBuffer);
+        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+        firstPageStatistics = null;
+      } else {
+        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+      }
+
+      // update statistics of this chunk
+      numOfPages++;
+      this.statistics.mergeStatistics(pageWriter.getStatistics());
+    } catch (IOException e) {
+      logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+    } finally {
+      // clear start time stamp for next initializing
+      pageWriter.reset(dataType);
+    }
+  }
+
+  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+    sealCurrentPage();
+    writeAllPagesOfChunkToTsFile(tsfileWriter);
+
+    // reinit this chunk writer
+    pageBuffer.reset();
+    numOfPages = 0;
+    firstPageStatistics = null;
+    this.statistics = Statistics.getStatsByType(dataType);
+  }
+
+  public long estimateMaxSeriesMemSize() {
+    return pageBuffer.size()
+        + pageWriter.estimateMaxMemSize()
+        + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+        + pageWriter.getStatistics().getSerializedSize();
+  }
+
+  public long getCurrentChunkSize() {
+    if (pageBuffer.size() == 0) {
+      return 0;
+    }
+    // return the serialized size of the chunk header + all pages
+    return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+        + (long) pageBuffer.size();
+  }
+
+  public void sealCurrentPage() {
+    // if the page contains no points, we still need to serialize it
+    if (pageWriter != null && pageWriter.getSize() != 0) {
+      writePageToPageBuffer();
+    }
+  }
+
+  public void clearPageWriter() {
+    pageWriter = null;
+  }
+
+  public int getNumOfPages() {
+    return numOfPages;
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * write the page to specified IOWriter.
+   *
+   * @param writer the specified IOWriter
+   * @throws IOException exception in IO
+   */
+  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+    if (statistics.getCount() == 0) {
+      return;
+    }
+
+    // start to write this column chunk
+    writer.startFlushChunk(
+        measurementId,
+        compressionType,
+        dataType,
+        encodingType,
+        statistics,
+        pageBuffer.size(),
+        numOfPages,
+        0x40);
+
+    long dataOffset = writer.getPos();
+
+    // write all pages of this column
+    writer.writeBytesToStream(pageBuffer);
+
+    int dataSize = (int) (writer.getPos() - dataOffset);
+    if (dataSize != pageBuffer.size()) {
+      throw new IOException(
+          "Bytes written is inconsistent with the size of data: "
+              + dataSize
+              + " !="
+              + " "
+              + pageBuffer.size());
+    }
+
+    writer.endCurrentChunk();
+  }
+
+  /** only used for test */
+  public PublicBAOS getPageBuffer() {
+    return pageBuffer;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
new file mode 100644
index 0000000..af71ecd
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class VectorChunkWriterImpl implements IChunkWriter {
+
+  private final TimeChunkWriter timeChunkWriter;
+  private final List<ValueChunkWriter> valueChunkWriterList;
+  private int valueIndex;
+
+  /** @param schema schema of this measurement */
+  public VectorChunkWriterImpl(IMeasurementSchema schema) {
+    timeChunkWriter =
+        new TimeChunkWriter(
+            schema.getMeasurementId(),
+            schema.getCompressor(),
+            schema.getTimeTSEncoding(),
+            schema.getTimeEncoder());
+
+    List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+    List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList();
+    List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList();
+    List<Encoder> valueEncoderList = schema.getValueEncoderList();
+
+    valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+    for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+      valueChunkWriterList.add(
+          new ValueChunkWriter(
+              valueMeasurementIdList.get(i),
+              schema.getCompressor(),
+              valueTSDataTypeList.get(i),
+              valueTSEncodingList.get(i),
+              valueEncoderList.get(i)));
+    }
+
+    this.valueIndex = 0;
+  }
+
+  @Override
+  public void write(long time, int value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time, long value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time, boolean value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time, float value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time, double value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time, Binary value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+  }
+
+  @Override
+  public void write(long time) {
+    valueIndex = 0;
+    timeChunkWriter.write(time);
+    if (checkPageSizeAndMayOpenANewPage()) {
+      writePageToPageBuffer();
+    }
+  }
+
+  // TODO tsfile write interface
+  @Override
+  public void write(long[] timestamps, int[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(long[] timestamps, long[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(long[] timestamps, float[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(long[] timestamps, double[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+   * to pageBuffer
+   */
+  private boolean checkPageSizeAndMayOpenANewPage() {
+    return timeChunkWriter.checkPageSizeAndMayOpenANewPage();
+  }
+
+  private void writePageToPageBuffer() {
+    timeChunkWriter.writePageToPageBuffer();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      valueChunkWriter.writePageToPageBuffer();
+    }
+  }
+
+  @Override
+  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+    timeChunkWriter.writeToFileWriter(tsfileWriter);
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      valueChunkWriter.writeToFileWriter(tsfileWriter);
+    }
+  }
+
+  @Override
+  public long estimateMaxSeriesMemSize() {
+    long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
+    }
+    return estimateMaxSeriesMemSize;
+  }
+
+  @Override
+  public long getCurrentChunkSize() {
+    long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      currentChunkSize += valueChunkWriter.getCurrentChunkSize();
+    }
+    return currentChunkSize;
+  }
+
+  @Override
+  public void sealCurrentPage() {
+    timeChunkWriter.sealCurrentPage();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      valueChunkWriter.sealCurrentPage();
+    }
+  }
+
+  @Override
+  public void clearPageWriter() {
+    timeChunkWriter.clearPageWriter();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      valueChunkWriter.clearPageWriter();
+    }
+  }
+
+  @Override
+  public int getNumOfPages() {
+    return timeChunkWriter.getNumOfPages();
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.Vector;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 8467d15..369b809 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -265,6 +265,7 @@ public class PageWriter {
   /** reset this page */
   public void reset(MeasurementSchema measurementSchema) {
     timeOut.reset();
+    timeEncoder = measurementSchema.getTimeEncoder();
     valueOut.reset();
     statistics = Statistics.getStatsByType(measurementSchema.getType());
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
new file mode 100644
index 0000000..5223d18
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/TimePageWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.page;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This writer is used to write time into a page. It consists of a time encoder and respective
+ * OutputStream.
+ */
+public class TimePageWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(TimePageWriter.class);
+
+  private final ICompressor compressor;
+
+  // time
+  private Encoder timeEncoder;
+  private final PublicBAOS timeOut;
+
+  /**
+   * statistic of current page. It will be reset after calling {@code
+   * writePageHeaderAndDataIntoBuff()}
+   */
+  private TimeStatistics statistics;
+
+  public TimePageWriter(Encoder timeEncoder, ICompressor compressor) {
+    this.timeOut = new PublicBAOS();
+    this.timeEncoder = timeEncoder;
+    this.statistics = new TimeStatistics();
+    this.compressor = compressor;
+  }
+
+  /** write a time into encoder */
+  public void write(long time) {
+    timeEncoder.encode(time, timeOut);
+    statistics.update(time);
+  }
+
+  /** write time series into encoder */
+  public void write(long[] timestamps, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+    }
+    statistics.update(timestamps, batchSize);
+  }
+
+  /** flush all data remained in encoders. */
+  private void prepareEndWriteOnePage() throws IOException {
+    timeEncoder.flush(timeOut);
+  }
+
+  /**
+   * getUncompressedBytes return data what it has been written in form of <code>
+   * size of time list, time list, value list</code>
+   *
+   * @return a new readable ByteBuffer whose position is 0.
+   */
+  public ByteBuffer getUncompressedBytes() throws IOException {
+    prepareEndWriteOnePage();
+    ByteBuffer buffer = ByteBuffer.allocate(timeOut.size());
+    buffer.put(timeOut.getBuf(), 0, timeOut.size());
+    buffer.flip();
+    return buffer;
+  }
+
+  /** write the page header and data into the PageWriter's output stream. */
+  public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first)
+      throws IOException {
+    if (statistics.getCount() == 0) {
+      return 0;
+    }
+
+    ByteBuffer pageData = getUncompressedBytes();
+    int uncompressedSize = pageData.remaining();
+    int compressedSize;
+    byte[] compressedBytes = null;
+
+    if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+      compressedSize = uncompressedSize;
+    } else {
+      compressedBytes = new byte[compressor.getMaxBytesForCompression(uncompressedSize)];
+      // data is never a directByteBuffer now, so we can use data.array()
+      compressedSize =
+          compressor.compress(
+              pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
+    }
+
+    // write the page header to IOWriter
+    int sizeWithoutStatistic = 0;
+    if (first) {
+      sizeWithoutStatistic +=
+          ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
+      sizeWithoutStatistic +=
+          ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
+    } else {
+      ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
+      ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
+      statistics.serialize(pageBuffer);
+    }
+
+    // write page content to temp PBAOS
+    logger.trace(
+        "start to flush a time page data into buffer, buffer position {} ", pageBuffer.size());
+    if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+      try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+        channel.write(pageData);
+      }
+    } else {
+      pageBuffer.write(compressedBytes, 0, compressedSize);
+    }
+    logger.trace(
+        "finish flushing a time page data into buffer, buffer position {} ", pageBuffer.size());
+    return sizeWithoutStatistic;
+  }
+
+  /**
+   * calculate max possible memory size it occupies, including time outputStream and value
+   * outputStream, because size outputStream is never used until flushing.
+   *
+   * @return allocated size in time, value and outputStream
+   */
+  public long estimateMaxMemSize() {
+    return timeOut.size() + timeEncoder.getMaxByteSize();
+  }
+
+  /** reset this page */
+  public void reset() {
+    timeOut.reset();
+    statistics = new TimeStatistics();
+  }
+
+  public void setTimeEncoder(Encoder encoder) {
+    this.timeEncoder = encoder;
+  }
+
+  public void initStatistics() {
+    statistics = new TimeStatistics();
+  }
+
+  public long getPointNumber() {
+    return statistics.getCount();
+  }
+
+  public TimeStatistics getStatistics() {
+    return statistics;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
similarity index 69%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
index 8467d15..0fbfdbd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/ValuePageWriter.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,21 +36,17 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 
 /**
- * This writer is used to write time-value into a page. It consists of a time encoder, a value
- * encoder and respective OutputStream.
+ * This writer is used to write value into a page. It consists of a value encoder and respective
+ * OutputStream.
  */
-public class PageWriter {
+public class ValuePageWriter {
+  private static final Logger logger = LoggerFactory.getLogger(ValuePageWriter.class);
 
-  private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
+  private final ICompressor compressor;
 
-  private ICompressor compressor;
-
-  // time
-  private Encoder timeEncoder;
-  private PublicBAOS timeOut;
   // value
   private Encoder valueEncoder;
-  private PublicBAOS valueOut;
+  private final PublicBAOS valueOut;
 
   /**
    * statistic of current page. It will be reset after calling {@code
@@ -59,76 +54,101 @@ public class PageWriter {
    */
   private Statistics<?> statistics;
 
-  public PageWriter() {
-    this(null, null);
-  }
+  private byte bitmap;
 
-  public PageWriter(MeasurementSchema measurementSchema) {
-    this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder());
-    this.statistics = Statistics.getStatsByType(measurementSchema.getType());
-    this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
-  }
+  private int size;
 
-  private PageWriter(Encoder timeEncoder, Encoder valueEncoder) {
-    this.timeOut = new PublicBAOS();
+  private final PublicBAOS bitmapOut;
+
+  private static final int MASK = 1 << 7;
+
+  public ValuePageWriter(Encoder valueEncoder, ICompressor compressor, TSDataType dataType) {
     this.valueOut = new PublicBAOS();
-    this.timeEncoder = timeEncoder;
+    this.bitmap = 0;
+    this.size = 0;
+    this.bitmapOut = new PublicBAOS();
     this.valueEncoder = valueEncoder;
+    this.statistics = Statistics.getStatsByType(dataType);
+    this.compressor = compressor;
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, boolean value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, boolean value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, short value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, short value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, int value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, int value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, long value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, long value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, float value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, float value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, double value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, double value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
   }
 
   /** write a time value pair into encoder */
-  public void write(long time, Binary value) {
-    timeEncoder.encode(time, timeOut);
-    valueEncoder.encode(value, valueOut);
-    statistics.update(time, value);
+  public void write(long time, Binary value, boolean isNull) {
+    setBit(isNull);
+    if (!isNull) {
+      valueEncoder.encode(value, valueOut);
+      statistics.update(time, value);
+    }
+  }
+
+  private void setBit(boolean isNull) {
+    if (!isNull) {
+      bitmap |= (MASK >>> (size % 8));
+    }
+    size++;
+    if (size % 8 == 0) {
+      bitmapOut.write(bitmap);
+      bitmap = 0;
+    }
   }
 
   /** write time series into encoder */
   public void write(long[] timestamps, boolean[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -137,7 +157,6 @@ public class PageWriter {
   /** write time series into encoder */
   public void write(long[] timestamps, int[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -146,7 +165,6 @@ public class PageWriter {
   /** write time series into encoder */
   public void write(long[] timestamps, long[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -155,7 +173,6 @@ public class PageWriter {
   /** write time series into encoder */
   public void write(long[] timestamps, float[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -164,7 +181,6 @@ public class PageWriter {
   /** write time series into encoder */
   public void write(long[] timestamps, double[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -173,7 +189,6 @@ public class PageWriter {
   /** write time series into encoder */
   public void write(long[] timestamps, Binary[] values, int batchSize) {
     for (int i = 0; i < batchSize; i++) {
-      timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
     statistics.update(timestamps, values, batchSize);
@@ -181,8 +196,10 @@ public class PageWriter {
 
   /** flush all data remained in encoders. */
   private void prepareEndWriteOnePage() throws IOException {
-    timeEncoder.flush(timeOut);
     valueEncoder.flush(valueOut);
+    if (size % 8 != 0) {
+      bitmapOut.write(bitmap);
+    }
   }
 
   /**
@@ -193,9 +210,9 @@ public class PageWriter {
    */
   public ByteBuffer getUncompressedBytes() throws IOException {
     prepareEndWriteOnePage();
-    ByteBuffer buffer = ByteBuffer.allocate(timeOut.size() + valueOut.size() + 4);
-    ReadWriteForEncodingUtils.writeUnsignedVarInt(timeOut.size(), buffer);
-    buffer.put(timeOut.getBuf(), 0, timeOut.size());
+    ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + bitmapOut.size() + valueOut.size());
+    buffer.putInt(size);
+    buffer.put(bitmapOut.getBuf(), 0, bitmapOut.size());
     buffer.put(valueOut.getBuf(), 0, valueOut.size());
     buffer.flip();
     return buffer;
@@ -204,7 +221,7 @@ public class PageWriter {
   /** write the page header and data into the PageWriter's output stream. */
   public int writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer, boolean first)
       throws IOException {
-    if (statistics.getCount() == 0) {
+    if (size == 0) {
       return 0;
     }
 
@@ -256,21 +273,16 @@ public class PageWriter {
    * @return allocated size in time, value and outputStream
    */
   public long estimateMaxMemSize() {
-    return timeOut.size()
-        + valueOut.size()
-        + timeEncoder.getMaxByteSize()
-        + valueEncoder.getMaxByteSize();
+    return Integer.BYTES + bitmapOut.size() + 1 + valueOut.size() + valueEncoder.getMaxByteSize();
   }
 
   /** reset this page */
-  public void reset(MeasurementSchema measurementSchema) {
-    timeOut.reset();
+  public void reset(TSDataType dataType) {
+    bitmapOut.reset();
+    size = 0;
+    bitmap = 0;
     valueOut.reset();
-    statistics = Statistics.getStatsByType(measurementSchema.getType());
-  }
-
-  public void setTimeEncoder(Encoder encoder) {
-    this.timeEncoder = encoder;
+    statistics = Statistics.getStatsByType(dataType);
   }
 
   public void setValueEncoder(Encoder encoder) {
@@ -288,4 +300,8 @@ public class PageWriter {
   public Statistics<?> getStatistics() {
     return statistics;
   }
+
+  public int getSize() {
+    return size;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
index b607a79..e8b4bee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
@@ -47,7 +47,7 @@ public class BooleanDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
index b988ed7..853313e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
@@ -47,7 +47,7 @@ public class DoubleDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
index 3d13f17..863be98 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
@@ -47,7 +47,7 @@ public class FloatDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
index a66378e..02e0d5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
@@ -47,7 +47,7 @@ public class IntDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
index 4bc2e9f..8dce510 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
@@ -47,7 +47,7 @@ public class LongDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
index 3a3918c..cf371bc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
@@ -48,7 +48,7 @@ public class StringDataPoint extends DataPoint {
       LOG.warn("given IChunkWriter is null, do nothing and return");
       return;
     }
-    writer.write(time, value);
+    writer.write(time, value, false);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
new file mode 100644
index 0000000..855e5fc
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.schema;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.List;
+
+public interface IMeasurementSchema {
+
+  String getMeasurementId();
+
+  CompressionType getCompressor();
+
+  TSDataType getType();
+
+  TSEncoding getTimeTSEncoding();
+
+  Encoder getTimeEncoder();
+
+  List<String> getValueMeasurementIdList();
+
+  List<TSDataType> getValueTSDataTypeList();
+
+  List<TSEncoding> getValueTSEncodingList();
+
+  List<Encoder> getValueEncoderList();
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6dfeec8..c68c78b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,6 +119,11 @@ public class TsFileIOWriter {
     startFile();
   }
 
+  /** for test only */
+  public TsFileIOWriter(TsFileOutput output, boolean test) {
+    this.out = output;
+  }
+
   /**
    * Writes given bytes to output stream. This method is called when total memory size exceeds the
    * chunk group size threshold.
@@ -163,35 +167,37 @@ public class TsFileIOWriter {
   /**
    * start a {@linkplain ChunkMetadata ChunkMetaData}.
    *
-   * @param measurementSchema - schema of this time series
+   * @param measurementId - measurementId of this time series
    * @param compressionCodecName - compression name of this time series
    * @param tsDataType - data type
    * @param statistics - Chunk statistics
    * @param dataSize - the serialized size of all pages
+   * @param mask - 0x80 for time chunk, 0x40 for value chunk, 0x00 for common chunk
    * @throws IOException if I/O error occurs
    */
   public void startFlushChunk(
-      MeasurementSchema measurementSchema,
+      String measurementId,
       CompressionType compressionCodecName,
       TSDataType tsDataType,
       TSEncoding encodingType,
       Statistics<?> statistics,
       int dataSize,
-      int numOfPages)
+      int numOfPages,
+      int mask)
       throws IOException {
 
     currentChunkMetadata =
-        new ChunkMetadata(
-            measurementSchema.getMeasurementId(), tsDataType, out.getPosition(), statistics);
+        new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
 
     ChunkHeader header =
         new ChunkHeader(
-            measurementSchema.getMeasurementId(),
+            measurementId,
             dataSize,
             tsDataType,
             compressionCodecName,
             encodingType,
-            numOfPages);
+            numOfPages,
+            mask);
     header.serializeTo(out.wrapAsStream());
   }
 
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 570ad8e..a0eb0e9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -62,12 +62,13 @@ public class TsFileIOWriterTest {
     // chunk group 1
     writer.startChunkGroup(deviceId);
     writer.startFlushChunk(
-        measurementSchema,
+        measurementSchema.getMeasurementId(),
         measurementSchema.getCompressor(),
         measurementSchema.getType(),
         measurementSchema.getEncodingType(),
         statistics,
         0,
+        0,
         0);
     writer.endCurrentChunk();
     writer.endChunkGroup();
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 64db027..e16ee55 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -131,13 +131,14 @@ public class RestorableTsFileIOWriterTest {
     writer
         .getIOWriter()
         .startFlushChunk(
-            new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN),
+            new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN).getMeasurementId(),
             CompressionType.SNAPPY,
             TSDataType.FLOAT,
             TSEncoding.PLAIN,
             new FloatStatistics(),
             100,
-            10);
+            10,
+            0);
     writer.getIOWriter().close();
 
     RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
new file mode 100644
index 0000000..df46b6f
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
@@ -0,0 +1,52 @@
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class TestTsFileOutput implements TsFileOutput {
+
+  PublicBAOS publicBAOS = new PublicBAOS();
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    publicBAOS.write(b);
+  }
+
+  @Override
+  public void write(byte b) {
+    publicBAOS.write(b);
+  }
+
+  @Override
+  public void write(ByteBuffer b) {
+    publicBAOS.write(b.array(), b.position(), b.limit());
+  }
+
+  @Override
+  public long getPosition() {
+    return publicBAOS.size();
+  }
+
+  @Override
+  public void close() throws IOException {
+    publicBAOS.close();
+  }
+
+  @Override
+  public OutputStream wrapAsStream() {
+    return publicBAOS;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    publicBAOS.flush();
+  }
+
+  @Override
+  public void truncate(long size) {
+    publicBAOS.truncate((int) size);
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
new file mode 100644
index 0000000..9968815
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class TimeChunkWriterTest {
+
+  @Test
+  public void testWrite1() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    TimeChunkWriter chunkWriter =
+        new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+    for (long time = 1; time <= 10; time++) {
+      chunkWriter.write(time);
+    }
+    assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage());
+    chunkWriter.sealCurrentPage();
+    // page without statistics size: 82 + chunk header size: 8
+    assertEquals(90L, chunkWriter.getCurrentChunkSize());
+
+    try {
+      TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+      TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+      chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      assertEquals(
+          (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+      assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(82, buffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testWrite2() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    TimeChunkWriter chunkWriter =
+        new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+    for (long time = 1; time <= 10; time++) {
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+    for (long time = 11; time <= 20; time++) {
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+    assertEquals(2, chunkWriter.getNumOfPages());
+    // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9
+    assertEquals(207L, chunkWriter.getCurrentChunkSize());
+
+    try {
+      TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+      TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+      chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+      assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(198, buffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
new file mode 100644
index 0000000..2ec6294
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TimePageWriterTest {
+
+  @Test
+  public void testWrite() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    try {
+      pageWriter.write(1L);
+      assertEquals(8, pageWriter.estimateMaxMemSize());
+      ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+      pageWriter.reset();
+      assertEquals(0, pageWriter.estimateMaxMemSize());
+      byte[] timeBytes = new byte[8];
+      buffer.get(timeBytes);
+      ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes);
+      PlainDecoder decoder = new PlainDecoder();
+      assertEquals(1L, decoder.readLong(buffer2));
+      decoder.reset();
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      pageWriter.write(1L);
+      pageWriter.write(2L);
+      pageWriter.write(3L);
+      // without page statistics
+      assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+      // total size
+      assertEquals(26, publicBAOS.size());
+      TimeStatistics statistics = pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(3L, statistics.getEndTime());
+      assertEquals(3, statistics.getCount());
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // uncompressedSize
+      assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      // compressedSize
+      assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+      assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+      assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      pageWriter.write(1L);
+      pageWriter.write(2L);
+      pageWriter.write(3L);
+      // with page statistics
+      assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+      // total size
+      assertEquals(43, publicBAOS.size());
+      TimeStatistics statistics = pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(3L, statistics.getEndTime());
+      assertEquals(3, statistics.getCount());
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // uncompressedSize
+      assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      // compressedSize
+      assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      TimeStatistics testStatistics =
+          (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector);
+      assertEquals(1L, testStatistics.getStartTime());
+      assertEquals(3L, testStatistics.getEndTime());
+      assertEquals(3, testStatistics.getCount());
+      assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+      assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+      assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+    Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+    TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      pageWriter.write(1L);
+      pageWriter.write(2L);
+      pageWriter.write(3L);
+      // without page statistics
+      assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+
+      // total size
+      assertEquals(22, publicBAOS.size());
+      TimeStatistics statistics = pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(3L, statistics.getEndTime());
+      assertEquals(3, statistics.getCount());
+      ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // uncompressedSize
+      assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+      // compressedSize
+      assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+      byte[] compress = new byte[20];
+      compressedBuffer.get(compress);
+      byte[] uncompress = new byte[24];
+      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+      unCompressor.uncompress(compress, 0, 20, uncompress, 0);
+      ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+      assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+      assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+      assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+    } catch (IOException e) {
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
new file mode 100644
index 0000000..3cc8272
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValueChunkWriterTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ValueChunkWriterTest {
+
+  @Test
+  public void testWrite1() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ValueChunkWriter chunkWriter =
+        new ValueChunkWriter(
+            "s1", CompressionType.UNCOMPRESSED, TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, time % 4 == 0);
+    }
+    chunkWriter.sealCurrentPage();
+    // page without statistics size: 69 + chunk header size: 8
+    assertEquals(77L, chunkWriter.getCurrentChunkSize());
+
+    try {
+      TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+      TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+      chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(69, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(69, buffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testWrite2() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ValueChunkWriter chunkWriter =
+        new ValueChunkWriter(
+            "s1", CompressionType.UNCOMPRESSED, TSDataType.FLOAT, TSEncoding.PLAIN, valueEncoder);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, time % 4 == 0);
+    }
+    chunkWriter.sealCurrentPage();
+    for (int time = 20; time <= 40; time++) {
+      chunkWriter.write(time, (float) time, time % 4 == 0);
+    }
+    chunkWriter.sealCurrentPage();
+    // two pages with statistics size: (69 + 41) * 2 + chunk header size: 9
+    assertEquals(229L, chunkWriter.getCurrentChunkSize());
+
+    TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+    TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+    try {
+      chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(220, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(220, buffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
new file mode 100644
index 0000000..a43159f
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/ValuePageWriterTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ValuePageWriterTest {
+
+  @Test
+  public void testWrite1() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    try {
+      pageWriter.write(1L, 1.0f, false);
+      assertEquals(9, pageWriter.estimateMaxMemSize());
+      ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+      pageWriter.reset(TSDataType.FLOAT);
+      assertEquals(5, pageWriter.estimateMaxMemSize());
+      assertEquals(1, ReadWriteIOUtils.readInt(buffer));
+      assertEquals(((byte) (1 << 7)), ReadWriteIOUtils.readByte(buffer));
+      PlainDecoder decoder = new PlainDecoder();
+      assertEquals(1.0f, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+      assertEquals(0, buffer.remaining());
+      decoder.reset();
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWrite2() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    try {
+      for (int time = 1; time <= 16; time++) {
+        pageWriter.write(time, (float) time, time % 4 == 0);
+      }
+      assertEquals(55, pageWriter.estimateMaxMemSize());
+      ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+      pageWriter.reset(TSDataType.FLOAT);
+      assertEquals(5, pageWriter.estimateMaxMemSize());
+      assertEquals(16, ReadWriteIOUtils.readInt(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      PlainDecoder decoder = new PlainDecoder();
+      for (int value = 1; value <= 16; value++) {
+        if (value % 4 != 0) {
+          assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+        }
+      }
+      assertEquals(0, buffer.remaining());
+      decoder.reset();
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWrite3() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    try {
+      for (int time = 1; time <= 20; time++) {
+        pageWriter.write(time, (float) time, time % 4 == 0);
+      }
+      assertEquals(67, pageWriter.estimateMaxMemSize());
+      ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+      pageWriter.reset(TSDataType.FLOAT);
+      assertEquals(5, pageWriter.estimateMaxMemSize());
+      assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+      PlainDecoder decoder = new PlainDecoder();
+      for (int value = 1; value <= 20; value++) {
+        if (value % 4 != 0) {
+          assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+        }
+      }
+      assertEquals(0, buffer.remaining());
+      decoder.reset();
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      for (int time = 1; time <= 20; time++) {
+        pageWriter.write(time, (float) time, time % 4 == 0);
+      }
+      // without page statistics
+      assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+      // total size
+      assertEquals(69, publicBAOS.size());
+      Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(19L, statistics.getEndTime());
+      assertEquals(15, statistics.getCount());
+      assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+      assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+      assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+
+      // uncompressedSize
+      assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      // compressedSize
+      assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+      // bitmap
+      assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+
+      for (int value = 1; value <= 20; value++) {
+        if (value % 4 != 0) {
+          assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+        }
+      }
+      assertEquals(0, buffer.remaining());
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      for (int time = 1; time <= 20; time++) {
+        pageWriter.write(time, (float) time, time % 4 == 0);
+      }
+      // without page statistics
+      assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+      // total size
+      assertEquals(110, publicBAOS.size());
+      Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(19L, statistics.getEndTime());
+      assertEquals(15, statistics.getCount());
+      assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+      assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+      assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // uncompressedSize
+      assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      // compressedSize
+      assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+      // Statistics
+      FloatStatistics testStatistics =
+          (FloatStatistics) FloatStatistics.deserialize(buffer, TSDataType.FLOAT);
+      assertEquals(1L, testStatistics.getStartTime());
+      assertEquals(19L, testStatistics.getEndTime());
+      assertEquals(15, testStatistics.getCount());
+      assertEquals(1.0f, testStatistics.getFirstValue(), 0.000001f);
+      assertEquals(19.0f, testStatistics.getLastValue(), 0.000001f);
+      assertEquals(1.0f, testStatistics.getMinValue(), 0.000001f);
+      assertEquals(19.0f, testStatistics.getMaxValue(), 0.000001f);
+      assertEquals(150.0f, (float) testStatistics.getSumDoubleValue(), 0.000001f);
+
+      // bitmap
+      assertEquals(20, ReadWriteIOUtils.readInt(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(buffer));
+
+      for (int value = 1; value <= 20; value++) {
+        if (value % 4 != 0) {
+          assertEquals((float) value, ReadWriteIOUtils.readFloat(buffer), 0.000001f);
+        }
+      }
+      assertEquals(0, buffer.remaining());
+    } catch (IOException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+    Encoder valueEncoder = new PlainEncoder(TSDataType.FLOAT, 0);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+    ValuePageWriter pageWriter = new ValuePageWriter(valueEncoder, compressor, TSDataType.FLOAT);
+    PublicBAOS publicBAOS = new PublicBAOS();
+    try {
+      for (int time = 1; time <= 20; time++) {
+        pageWriter.write(time, (float) time, time % 4 == 0);
+      }
+      // without page statistics
+      assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+      // total size
+      assertEquals(72, publicBAOS.size());
+      Statistics<Float> statistics = (Statistics<Float>) pageWriter.getStatistics();
+      assertEquals(1L, statistics.getStartTime());
+      assertEquals(19L, statistics.getEndTime());
+      assertEquals(15, statistics.getCount());
+      assertEquals(1.0f, statistics.getFirstValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getLastValue(), 0.000001f);
+      assertEquals(1.0f, statistics.getMinValue(), 0.000001f);
+      assertEquals(19.0f, statistics.getMaxValue(), 0.000001f);
+      assertEquals(150.0f, (float) statistics.getSumDoubleValue(), 0.000001f);
+
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+
+      // uncompressedSize
+      assertEquals(67, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      // compressedSize
+      assertEquals(70, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+
+      byte[] compress = new byte[70];
+      buffer.get(compress);
+      byte[] uncompress = new byte[67];
+      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+      unCompressor.uncompress(compress, 0, 70, uncompress, 0);
+      ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+
+      // bitmap
+      assertEquals(20, ReadWriteIOUtils.readInt(uncompressedBuffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+      assertEquals(((byte) (0xEE)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+      assertEquals(((byte) (0xE0)), ReadWriteIOUtils.readByte(uncompressedBuffer));
+
+      for (int value = 1; value <= 20; value++) {
+        if (value % 4 != 0) {
+          assertEquals((float) value, ReadWriteIOUtils.readFloat(uncompressedBuffer), 0.000001f);
+        }
+      }
+      assertEquals(0, uncompressedBuffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
new file mode 100644
index 0000000..93e18bb
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VectorChunkWriterImplTest {
+
+  @Test
+  public void testWrite1() {
+    VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+    VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+
+    chunkWriter.sealCurrentPage();
+    // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 +
+    // 20; value chunk 3: 9 + 4 + 7 + 20 * 8;
+    assertEquals(492L, chunkWriter.getCurrentChunkSize());
+
+    try {
+      TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+      TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+      chunkWriter.writeToFileWriter(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // time chunk
+      assertEquals(
+          (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 164);
+
+      // value chunk 1
+      assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 89);
+
+      // value chunk 2
+      assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 29);
+
+      // value chunk 2
+      assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(171, buffer.remaining());
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testWrite2() {
+    VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+    VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+    for (int time = 21; time <= 40; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    // time chunk: 14 + (4 + 17 + 160) * 2
+    // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2
+    // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2
+    // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2
+    assertEquals(1259L, chunkWriter.getCurrentChunkSize());
+
+    try {
+      TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+      TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+      chunkWriter.writeToFileWriter(writer);
+      PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+      ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+      // time chunk
+      assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 362);
+
+      // value chunk 1
+      assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 260);
+
+      // value chunk 2
+      assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      buffer.position(buffer.position() + 140);
+
+      // value chunk 2
+      assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+      assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+      assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+      assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+      assertEquals(456, buffer.remaining());
+
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
new file mode 100644
index 0000000..795a0a6
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class VectorMeasurementSchemaStub implements IMeasurementSchema {
+
+  @Override
+  public String getMeasurementId() {
+    return "s1.time";
+  }
+
+  @Override
+  public CompressionType getCompressor() {
+    return CompressionType.UNCOMPRESSED;
+  }
+
+  @Override
+  public TSDataType getType() {
+    return TSDataType.Vector;
+  }
+
+  @Override
+  public TSEncoding getTimeTSEncoding() {
+    return TSEncoding.PLAIN;
+  }
+
+  @Override
+  public Encoder getTimeEncoder() {
+    return new PlainEncoder(TSDataType.INT64, 0);
+  }
+
+  @Override
+  public List<String> getValueMeasurementIdList() {
+    return Arrays.asList("s1", "s2", "s3");
+  }
+
+  @Override
+  public List<TSDataType> getValueTSDataTypeList() {
+    return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public List<TSEncoding> getValueTSEncodingList() {
+    return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
+  }
+
+  @Override
+  public List<Encoder> getValueEncoderList() {
+    return Arrays.asList(
+        new PlainEncoder(TSDataType.FLOAT, 0),
+        new PlainEncoder(TSDataType.INT32, 0),
+        new PlainEncoder(TSDataType.DOUBLE, 0));
+  }
+}