You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:22 UTC

[iotdb] 02/04: merge Vector

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56d92eb416866d86d8217d5a2f0de1b425b8237f
Merge: eb42aed 9c72690
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 14:05:44 2021 +0800

    merge Vector

 .../iotdb/cluster/client/DataClientProvider.java   |   4 +-
 .../cluster/client/async/AsyncClientPool.java      |  34 ++-
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  59 +++--
 .../iotdb/cluster/client/sync/SyncDataClient.java  |   9 +-
 .../iotdb/cluster/client/sync/SyncMetaClient.java  |  10 +-
 .../exception/BadSeedUrlFormatException.java       |   3 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  78 ++----
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  14 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  37 +--
 .../cluster/query/aggregate/ClusterAggregator.java |  14 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |  14 +-
 .../query/groupby/RemoteGroupByExecutor.java       |  27 +-
 .../query/last/ClusterLastQueryExecutor.java       |  14 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  14 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  12 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |  11 +-
 .../cluster/server/PullSnapshotHintService.java    |   9 +-
 .../cluster/server/service/BaseSyncService.java    |  15 +-
 .../cluster/server/service/DataSyncService.java    |   7 +-
 .../cluster/server/service/MetaSyncService.java    |   5 +-
 .../cluster/client/DataClientProviderTest.java     | 136 ++++++++++
 .../cluster/client/sync/SyncClientPoolTest.java    |  17 +-
 .../cluster/client/sync/SyncDataClientTest.java    |  51 ++++
 .../cluster/client/sync/SyncMetaClientTest.java    |  47 ++++
 server/src/assembly/resources/conf/iotdb-env.bat   |   5 +
 server/src/assembly/resources/conf/iotdb-env.sh    |   3 +
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  12 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  12 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  27 +-
 .../iotdb/tsfile/file/header/ChunkHeader.java      |  15 +-
 .../file/metadata/statistics/Statistics.java       |  94 ++-----
 .../file/metadata/statistics/TimeStatistics.java   | 161 ++++++++++++
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   4 +
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |  18 +-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  24 +-
 .../iotdb/tsfile/write/chunk/IChunkWriter.java     |  15 +-
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  | 255 ++++++++++++++++++
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++
 .../tsfile/write/chunk/VectorChunkWriterImpl.java  | 202 ++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |   1 +
 .../iotdb/tsfile/write/page/TimePageWriter.java    | 177 +++++++++++++
 .../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++-----
 .../write/record/datapoint/BooleanDataPoint.java   |   2 +-
 .../write/record/datapoint/DoubleDataPoint.java    |   2 +-
 .../write/record/datapoint/FloatDataPoint.java     |   2 +-
 .../write/record/datapoint/IntDataPoint.java       |   2 +-
 .../write/record/datapoint/LongDataPoint.java      |   2 +-
 .../write/record/datapoint/StringDataPoint.java    |   2 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |  34 ++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  22 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   3 +-
 .../write/writer/RestorableTsFileIOWriterTest.java |   5 +-
 .../tsfile/write/writer/TestTsFileOutput.java      |  70 +++++
 .../tsfile/write/writer/TimeChunkWriterTest.java   | 111 ++++++++
 .../tsfile/write/writer/TimePageWriterTest.java    | 171 ++++++++++++
 .../tsfile/write/writer/ValueChunkWriterTest.java  | 109 ++++++++
 .../tsfile/write/writer/ValuePageWriterTest.java   | 291 +++++++++++++++++++++
 .../write/writer/VectorChunkWriterImplTest.java    | 178 +++++++++++++
 .../write/writer/VectorMeasurementSchemaStub.java  |  80 ++++++
 59 files changed, 2722 insertions(+), 442 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index dcd82ab,caa0893..c1bc5a9
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@@ -173,29 -173,23 +173,29 @@@ public class MemTableFlushTask 
  
              switch (dataType) {
                case BOOLEAN:
-                 seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+                 seriesWriterImpl.write(time, tvPairs.getBoolean(i), false);
                  break;
                case INT32:
-                 seriesWriterImpl.write(time, tvPairs.getInt(i));
+                 seriesWriterImpl.write(time, tvPairs.getInt(i), false);
                  break;
                case INT64:
-                 seriesWriterImpl.write(time, tvPairs.getLong(i));
+                 seriesWriterImpl.write(time, tvPairs.getLong(i), false);
                  break;
                case FLOAT:
-                 seriesWriterImpl.write(time, tvPairs.getFloat(i));
+                 seriesWriterImpl.write(time, tvPairs.getFloat(i), false);
                  break;
                case DOUBLE:
-                 seriesWriterImpl.write(time, tvPairs.getDouble(i));
+                 seriesWriterImpl.write(time, tvPairs.getDouble(i), false);
                  break;
                case TEXT:
-                 seriesWriterImpl.write(time, tvPairs.getBinary(i));
+                 seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
                  break;
 +              case VECTOR:
 +                // TODO: 
 +//                for ( : tvPairs.getVector(i)) {
 +//                  seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
 +//                }
 +                break;
                default:
                  LOGGER.error(
                      "Storage group {} does not support data type: {}", storageGroup, dataType);
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 1ec2001,1c1ba7f..759a751
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@@ -77,6 -77,8 +77,8 @@@ public abstract class Statistics<T> 
          return new DoubleStatistics();
        case FLOAT:
          return new FloatStatistics();
 -      case Vector:
++      case VECTOR:
+         return new TimeStatistics();
        default:
          throw new UnknownColumnTypeException(type.toString());
      }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
index 0000000,74bd701..e812166
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
@@@ -1,0 -1,161 +1,161 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.file.metadata.statistics;
+ 
+ import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.ByteBuffer;
+ 
+ public class TimeStatistics extends Statistics {
+ 
+   static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40;
+ 
+   @Override
+   public TSDataType getType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   @Override
+   public int getStatsSize() {
+     return 0;
+   }
+ 
+   @Override
+   public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+     throw new StatisticsClassException("Time statistics does not support: set min max from bytes");
+   }
+ 
+   @Override
+   public Long getMinValue() {
+     throw new StatisticsClassException("Time statistics does not support: min value");
+   }
+ 
+   @Override
+   public Long getMaxValue() {
+     throw new StatisticsClassException("Time statistics does not support: max value");
+   }
+ 
+   @Override
+   public Long getFirstValue() {
+     throw new StatisticsClassException("Time statistics does not support: first value");
+   }
+ 
+   @Override
+   public Long getLastValue() {
+     throw new StatisticsClassException("Time statistics does not support: last value");
+   }
+ 
+   @Override
+   public double getSumDoubleValue() {
+     throw new StatisticsClassException("Time statistics does not support: double sum");
+   }
+ 
+   @Override
+   public long getSumLongValue() {
+     throw new StatisticsClassException("Time statistics does not support: long sum");
+   }
+ 
+   @Override
+   void updateStats(long value) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   void updateStats(long[] values, int batchSize) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   public void updateStats(long minValue, long maxValue) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   public long calculateRamSize() {
+     return TIME_STATISTICS_FIXED_RAM_SIZE;
+   }
+ 
+   @Override
+   protected void mergeStatisticsValue(Statistics stats) {}
+ 
+   @Override
+   public byte[] getMinValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+   }
+ 
+   @Override
+   public byte[] getMaxValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get max value bytes");
+   }
+ 
+   @Override
+   public byte[] getFirstValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get first value bytes");
+   }
+ 
+   @Override
+   public byte[] getLastValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get last value bytes");
+   }
+ 
+   @Override
+   public byte[] getSumValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get sum value bytes");
+   }
+ 
+   @Override
+   public ByteBuffer getMinValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+   }
+ 
+   @Override
+   public ByteBuffer getMaxValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get max value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getFirstValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get first value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getLastValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get last value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getSumValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get sum value buffer");
+   }
+ 
+   @Override
+   public int serializeStats(OutputStream outputStream) {
+     return 0;
+   }
+ 
+   @Override
+   public void deserialize(InputStream inputStream) throws IOException {}
+ 
+   @Override
+   public void deserialize(ByteBuffer byteBuffer) {}
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 0000000,df9ded9..522eff5
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@@ -1,0 -1,255 +1,255 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.chunk;
+ 
+ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+ import org.apache.iotdb.tsfile.file.header.PageHeader;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.IOException;
+ 
+ public class TimeChunkWriter {
+ 
+   private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
+ 
+   private final String measurementId;
+ 
+   private final TSEncoding encodingType;
+ 
+   private final CompressionType compressionType;
+ 
+   /** all pages of this chunk. */
+   private final PublicBAOS pageBuffer;
+ 
+   private int numOfPages;
+ 
+   /** write data into current page */
+   private TimePageWriter pageWriter;
+ 
+   /** page size threshold. */
+   private final long pageSizeThreshold;
+ 
+   private final int maxNumberOfPointsInPage;
+ 
+   /** value count in current page. */
+   private int valueCountInOnePageForNextCheck;
+ 
+   // initial value for valueCountInOnePageForNextCheck
+   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+ 
+   /** statistic of this chunk. */
+   private TimeStatistics statistics;
+ 
+   /** first page info */
+   private int sizeWithoutStatistic;
+ 
+   private Statistics<?> firstPageStatistics;
+ 
+   public TimeChunkWriter(
+       String measurementId,
+       CompressionType compressionType,
+       TSEncoding encodingType,
+       Encoder timeEncoder) {
+     this.measurementId = measurementId;
+     this.encodingType = encodingType;
+     this.compressionType = compressionType;
+     this.pageBuffer = new PublicBAOS();
+ 
+     this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+     this.maxNumberOfPointsInPage =
+         TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+     // initial check of memory usage. So that we have enough data to make an initial prediction
+     this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ 
+     // init statistics for this chunk and page
+     this.statistics = new TimeStatistics();
+ 
+     this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+   }
+ 
+   public void write(long time) {
+     pageWriter.write(time);
+   }
+ 
+   public void write(long[] timestamps, int batchSize) {
+     pageWriter.write(timestamps, batchSize);
+   }
+ 
+   /**
+    * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+    * to pageBuffer
+    */
+   public boolean checkPageSizeAndMayOpenANewPage() {
+     if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+       logger.debug("current line count reaches the upper bound, write page {}", measurementId);
+       return true;
+     } else if (pageWriter.getPointNumber()
+         >= valueCountInOnePageForNextCheck) { // need to check memory size
+       // not checking the memory used for every value
+       long currentPageSize = pageWriter.estimateMaxMemSize();
+       if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+         // we will write the current page
+         logger.debug(
+             "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+             measurementId,
+             pageSizeThreshold,
+             currentPageSize,
+             pageWriter.getPointNumber());
+         valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+         return true;
+       } else {
+         // reset the valueCountInOnePageForNextCheck for the next page
+         valueCountInOnePageForNextCheck =
+             (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
+       }
+     }
+     return false;
+   }
+ 
+   public void writePageToPageBuffer() {
+     try {
+       if (numOfPages == 0) { // record the firstPageStatistics
+         this.firstPageStatistics = pageWriter.getStatistics();
+         this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+       } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+         byte[] b = pageBuffer.toByteArray();
+         pageBuffer.reset();
+         pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+         firstPageStatistics.serialize(pageBuffer);
+         pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+         pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+         firstPageStatistics = null;
+       } else {
+         pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+       }
+ 
+       // update statistics of this chunk
+       numOfPages++;
+       this.statistics.mergeStatistics(pageWriter.getStatistics());
+     } catch (IOException e) {
+       logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+     } finally {
+       // clear start time stamp for next initializing
+       pageWriter.reset();
+     }
+   }
+ 
+   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+     sealCurrentPage();
+     writeAllPagesOfChunkToTsFile(tsfileWriter);
+ 
+     // reinit this chunk writer
+     pageBuffer.reset();
+     numOfPages = 0;
+     firstPageStatistics = null;
+     this.statistics = new TimeStatistics();
+   }
+ 
+   public long estimateMaxSeriesMemSize() {
+     return pageBuffer.size()
+         + pageWriter.estimateMaxMemSize()
+         + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+         + pageWriter.getStatistics().getSerializedSize();
+   }
+ 
+   public long getCurrentChunkSize() {
+     if (pageBuffer.size() == 0) {
+       return 0;
+     }
+     // return the serialized size of the chunk header + all pages
+     return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+         + (long) pageBuffer.size();
+   }
+ 
+   public void sealCurrentPage() {
+     if (pageWriter != null && pageWriter.getPointNumber() > 0) {
+       writePageToPageBuffer();
+     }
+   }
+ 
+   public void clearPageWriter() {
+     pageWriter = null;
+   }
+ 
+   public int getNumOfPages() {
+     return numOfPages;
+   }
+ 
+   public TSDataType getDataType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   /**
+    * write the page to specified IOWriter.
+    *
+    * @param writer the specified IOWriter
+    * @throws IOException exception in IO
+    */
+   public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+     if (statistics.getCount() == 0) {
+       return;
+     }
+ 
+     // start to write this column chunk
+     writer.startFlushChunk(
+         measurementId,
+         compressionType,
 -        TSDataType.Vector,
++        TSDataType.VECTOR,
+         encodingType,
+         statistics,
+         pageBuffer.size(),
+         numOfPages,
+         0x80);
+ 
+     long dataOffset = writer.getPos();
+ 
+     // write all pages of this column
+     writer.writeBytesToStream(pageBuffer);
+ 
+     int dataSize = (int) (writer.getPos() - dataOffset);
+     if (dataSize != pageBuffer.size()) {
+       throw new IOException(
+           "Bytes written is inconsistent with the size of data: "
+               + dataSize
+               + " !="
+               + " "
+               + pageBuffer.size());
+     }
+ 
+     writer.endCurrentChunk();
+   }
+ 
+   /** only used for test */
+   public PublicBAOS getPageBuffer() {
+     return pageBuffer;
+   }
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
index 0000000,af71ecd..8f1e907
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
@@@ -1,0 -1,202 +1,202 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.chunk;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.Binary;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ public class VectorChunkWriterImpl implements IChunkWriter {
+ 
+   private final TimeChunkWriter timeChunkWriter;
+   private final List<ValueChunkWriter> valueChunkWriterList;
+   private int valueIndex;
+ 
+   /** @param schema schema of this measurement */
+   public VectorChunkWriterImpl(IMeasurementSchema schema) {
+     timeChunkWriter =
+         new TimeChunkWriter(
+             schema.getMeasurementId(),
+             schema.getCompressor(),
+             schema.getTimeTSEncoding(),
+             schema.getTimeEncoder());
+ 
+     List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+     List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList();
+     List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList();
+     List<Encoder> valueEncoderList = schema.getValueEncoderList();
+ 
+     valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+     for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+       valueChunkWriterList.add(
+           new ValueChunkWriter(
+               valueMeasurementIdList.get(i),
+               schema.getCompressor(),
+               valueTSDataTypeList.get(i),
+               valueTSEncodingList.get(i),
+               valueEncoderList.get(i)));
+     }
+ 
+     this.valueIndex = 0;
+   }
+ 
+   @Override
+   public void write(long time, int value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, long value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, boolean value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, float value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, double value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, Binary value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time) {
+     valueIndex = 0;
+     timeChunkWriter.write(time);
+     if (checkPageSizeAndMayOpenANewPage()) {
+       writePageToPageBuffer();
+     }
+   }
+ 
+   // TODO tsfile write interface
+   @Override
+   public void write(long[] timestamps, int[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, long[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, boolean[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, float[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, double[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, Binary[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+    * to pageBuffer
+    */
+   private boolean checkPageSizeAndMayOpenANewPage() {
+     return timeChunkWriter.checkPageSizeAndMayOpenANewPage();
+   }
+ 
+   private void writePageToPageBuffer() {
+     timeChunkWriter.writePageToPageBuffer();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.writePageToPageBuffer();
+     }
+   }
+ 
+   @Override
+   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+     timeChunkWriter.writeToFileWriter(tsfileWriter);
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.writeToFileWriter(tsfileWriter);
+     }
+   }
+ 
+   @Override
+   public long estimateMaxSeriesMemSize() {
+     long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
+     }
+     return estimateMaxSeriesMemSize;
+   }
+ 
+   @Override
+   public long getCurrentChunkSize() {
+     long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       currentChunkSize += valueChunkWriter.getCurrentChunkSize();
+     }
+     return currentChunkSize;
+   }
+ 
+   @Override
+   public void sealCurrentPage() {
+     timeChunkWriter.sealCurrentPage();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.sealCurrentPage();
+     }
+   }
+ 
+   @Override
+   public void clearPageWriter() {
+     timeChunkWriter.clearPageWriter();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.clearPageWriter();
+     }
+   }
+ 
+   @Override
+   public int getNumOfPages() {
+     return timeChunkWriter.getNumOfPages();
+   }
+ 
+   @Override
+   public TSDataType getDataType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
index 0000000,9968815..bdca8d5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@@ -1,0 -1,111 +1,111 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.fail;
+ 
+ public class TimeChunkWriterTest {
+ 
+   @Test
+   public void testWrite1() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     TimeChunkWriter chunkWriter =
+         new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+     for (long time = 1; time <= 10; time++) {
+       chunkWriter.write(time);
+     }
+     assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage());
+     chunkWriter.sealCurrentPage();
+     // page without statistics size: 82 + chunk header size: 8
+     assertEquals(90L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       assertEquals(
+           (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(82, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWrite2() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     TimeChunkWriter chunkWriter =
+         new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+     for (long time = 1; time <= 10; time++) {
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     for (long time = 11; time <= 20; time++) {
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     assertEquals(2, chunkWriter.getNumOfPages());
+     // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9
+     assertEquals(207L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(198, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
index 0000000,2ec6294..cab975c
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@@ -1,0 -1,171 +1,171 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.compress.IUnCompressor;
+ import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ public class TimePageWriterTest {
+ 
+   @Test
+   public void testWrite() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     try {
+       pageWriter.write(1L);
+       assertEquals(8, pageWriter.estimateMaxMemSize());
+       ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+       ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+       pageWriter.reset();
+       assertEquals(0, pageWriter.estimateMaxMemSize());
+       byte[] timeBytes = new byte[8];
+       buffer.get(timeBytes);
+       ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes);
+       PlainDecoder decoder = new PlainDecoder();
+       assertEquals(1L, decoder.readLong(buffer2));
+       decoder.reset();
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // without page statistics
+       assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+       // total size
+       assertEquals(26, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       // compressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // with page statistics
+       assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+       // total size
+       assertEquals(43, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       // compressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       TimeStatistics testStatistics =
 -          (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector);
++          (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.VECTOR);
+       assertEquals(1L, testStatistics.getStartTime());
+       assertEquals(3L, testStatistics.getEndTime());
+       assertEquals(3, testStatistics.getCount());
+       assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // without page statistics
+       assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ 
+       // total size
+       assertEquals(22, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+       // compressedSize
+       assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+       byte[] compress = new byte[20];
+       compressedBuffer.get(compress);
+       byte[] uncompress = new byte[24];
+       IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+       unCompressor.uncompress(compress, 0, 20, uncompress, 0);
+       ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+       assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
index 0000000,93e18bb..3ca81b1
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
@@@ -1,0 -1,178 +1,178 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ public class VectorChunkWriterImplTest {
+ 
+   @Test
+   public void testWrite1() {
+     VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+     VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+ 
+     for (int time = 1; time <= 20; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+ 
+     chunkWriter.sealCurrentPage();
+     // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 +
+     // 20; value chunk 3: 9 + 4 + 7 + 20 * 8;
+     assertEquals(492L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeToFileWriter(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // time chunk
+       assertEquals(
+           (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 164);
+ 
+       // value chunk 1
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 89);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 29);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(171, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWrite2() {
+     VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+     VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+ 
+     for (int time = 1; time <= 20; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     for (int time = 21; time <= 40; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+ 
+     // time chunk: 14 + (4 + 17 + 160) * 2
+     // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2
+     // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2
+     // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2
+     assertEquals(1259L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeToFileWriter(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // time chunk
+       assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 362);
+ 
+       // value chunk 1
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 260);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 140);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(456, buffer.remaining());
+ 
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
index 0000000,795a0a6..40335f5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
@@@ -1,0 -1,80 +1,80 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+ 
+ import java.util.Arrays;
+ import java.util.List;
+ 
+ public class VectorMeasurementSchemaStub implements IMeasurementSchema {
+ 
+   @Override
+   public String getMeasurementId() {
+     return "s1.time";
+   }
+ 
+   @Override
+   public CompressionType getCompressor() {
+     return CompressionType.UNCOMPRESSED;
+   }
+ 
+   @Override
+   public TSDataType getType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   @Override
+   public TSEncoding getTimeTSEncoding() {
+     return TSEncoding.PLAIN;
+   }
+ 
+   @Override
+   public Encoder getTimeEncoder() {
+     return new PlainEncoder(TSDataType.INT64, 0);
+   }
+ 
+   @Override
+   public List<String> getValueMeasurementIdList() {
+     return Arrays.asList("s1", "s2", "s3");
+   }
+ 
+   @Override
+   public List<TSDataType> getValueTSDataTypeList() {
+     return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE);
+   }
+ 
+   @Override
+   public List<TSEncoding> getValueTSEncodingList() {
+     return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
+   }
+ 
+   @Override
+   public List<Encoder> getValueEncoderList() {
+     return Arrays.asList(
+         new PlainEncoder(TSDataType.FLOAT, 0),
+         new PlainEncoder(TSDataType.INT32, 0),
+         new PlainEncoder(TSDataType.DOUBLE, 0));
+   }
+ }