You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/29 07:39:11 UTC

[incubator-iotdb] branch refactor_tsfile updated: add all writer V2

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch refactor_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_tsfile by this push:
     new d6fc68b  add  all writer V2
d6fc68b is described below

commit d6fc68ba5471d79c70871eac7a6224bcbbf1e6d7
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Nov 29 15:38:52 2019 +0800

    add  all writer V2
---
 .../tsfile/file/metadataV2/TsFileMetadataV2.java   |  13 +-
 .../apache/iotdb/tsfile/write/TsFileWriterV2.java  | 282 +++++++++++++++
 .../tsfile/write/chunk/ChunkWriterImplV2.java      | 281 +++++++++++++++
 .../iotdb/tsfile/write/page/PageWriterV2.java      | 389 +++++++++++++++++++++
 .../write/record/datapoint/BooleanDataPoint.java   |  10 +
 .../tsfile/write/record/datapoint/DataPoint.java   |   3 +
 .../write/record/datapoint/DoubleDataPoint.java    |  10 +
 .../write/record/datapoint/FloatDataPoint.java     |  10 +
 .../write/record/datapoint/IntDataPoint.java       |  10 +
 .../write/record/datapoint/LongDataPoint.java      |  10 +
 .../write/record/datapoint/StringDataPoint.java    |  10 +
 .../iotdb/tsfile/write/schemaV2/SchemaV2.java      |  18 +-
 .../tsfile/write/schemaV2/TimeseriesSchema.java    |  67 ----
 .../tsfile/write/writer/TsFileIOWriterV2.java      | 371 ++++++++++++++++++++
 14 files changed, 1404 insertions(+), 80 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
index 13f0f44..8a55180 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadataV2/TsFileMetadataV2.java
@@ -33,21 +33,14 @@ import java.util.Map;
 import java.util.TreeMap;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.BloomFilter;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
 
 /**
  * TSFileMetaData collects all metadata info and saves in its data structure.
  */
 public class TsFileMetadataV2 {
 
-  private SchemaV2 schemaV2;
-
   // fields below are IoTDB extensions and they does not affect TsFile's stand-alone functionality
   private int totalChunkNum;
   // invalid means a chunk has been rewritten by merge and the chunk's data is in
@@ -57,11 +50,7 @@ public class TsFileMetadataV2 {
   // bloom filter
   private BloomFilter bloomFilter;
 
-  private List<ChunkMetaData> chunkMetaDataList;
-
-  private TreeMap<Path, List<ChunkMetaData>> timeseriesMetadata;
-
-  private List<Pair<Path, Long>> seriesMetadataIndex;
+  private long[] seriesMetadataIndex;
 
   /**
    * deserialize data from the buffer.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java
new file mode 100644
index 0000000..1f1d9ae
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriterV2.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriterV2;
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TsFileWriter is the entrance for writing processing. It receives a record and send it to
+ * responding chunk group write. It checks memory size for all writing processing along its strategy
+ * and flush data stored in memory to OutputStream. At the end of writing, user should call {@code
+ * close()} method to flush the last data outside and close the normal outputStream and error
+ * outputStream.
+ */
+public class TsFileWriterV2 implements AutoCloseable{
+
+  private static final Logger LOG = LoggerFactory.getLogger(TsFileWriter.class);
+  protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
+  /**
+   * schema of this TsFile.
+   **/
+  protected final SchemaV2 schema;
+  /**
+   * IO writer of this TsFile.
+   **/
+  private final TsFileIOWriterV2 fileWriter;
+  private final int pageSize;
+  private long recordCount = 0;
+
+  private Map<Path, ChunkWriterImplV2> chunkWriters = new HashMap<>();
+
+  /**
+   * min value of threshold of data points num check.
+   **/
+  private long recordCountForNextMemCheck = 100;
+  private long chunkGroupSizeThreshold;
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param file the File to be written by this TsFileWriter
+   */
+  public TsFileWriterV2(File file) throws IOException {
+    this(new TsFileIOWriterV2(file), new SchemaV2(), TSFileDescriptor.getInstance().getConfig());
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param fileWriter the io writer of this TsFile
+   */
+  public TsFileWriterV2(TsFileIOWriterV2 fileWriter) throws IOException {
+    this(fileWriter, new SchemaV2(), TSFileDescriptor.getInstance().getConfig());
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param file the File to be written by this TsFileWriter
+   * @param schema the schema of this TsFile
+   */
+  public TsFileWriterV2(File file, SchemaV2 schema) throws IOException {
+    this(new TsFileIOWriterV2(file), schema, TSFileDescriptor.getInstance().getConfig());
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param output the TsFileOutput of the file to be written by this TsFileWriter
+   * @param schema the schema of this TsFile
+   * @throws IOException
+   */
+  public TsFileWriterV2(TsFileOutput output, SchemaV2 schema) throws IOException {
+    this(new TsFileIOWriterV2(output), schema, TSFileDescriptor.getInstance().getConfig());
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param file the File to be written by this TsFileWriter
+   * @param schema the schema of this TsFile
+   * @param conf the configuration of this TsFile
+   */
+  public TsFileWriterV2(File file, SchemaV2 schema, TSFileConfig conf) throws IOException {
+    this(new TsFileIOWriterV2(file), schema, conf);
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
+   * @param fileWriter the io writer of this TsFile
+   * @param schema the schema of this TsFile
+   * @param conf the configuration of this TsFile
+   */
+  protected TsFileWriterV2(TsFileIOWriterV2 fileWriter, SchemaV2 schema, TSFileConfig conf)
+      throws IOException {
+    if (!fileWriter.canWrite()) {
+      throw new IOException(
+          "the given file Writer does not support writing any more. Maybe it is an complete TsFile");
+    }
+    this.fileWriter = fileWriter;
+    this.schema = schema;
+    this.pageSize = conf.getPageSizeInByte();
+    this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
+    config.setTSFileStorageFs(conf.getTSFileStorageFs().name());
+    if (this.pageSize >= chunkGroupSizeThreshold) {
+      LOG.warn(
+          "TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
+              + " size or decrease page size. ", pageSize, chunkGroupSizeThreshold);
+    }
+  }
+
+  /**
+   * add a measurementSchema to this TsFile.
+   */
+  public void addDeviceTemplates(Map<String, TimeseriesSchema> template) throws WriteProcessException {
+
+  }
+
+  /**
+   * Confirm whether the record is legal. If legal, add it into this RecordWriter.
+   *
+   * @param record - a record responding a line
+   * @return - whether the record has been added into RecordWriter legally
+   * @throws WriteProcessException exception
+   */
+  private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
+    for (DataPoint dataPoint: record.dataPointList) {
+      Path path = new Path(record.deviceId, dataPoint.getMeasurementId());
+      if (chunkWriters.containsKey(path)) {
+        continue;
+      }
+
+      if (!schema.containsTimeseries(new Path(record.deviceId, dataPoint.getMeasurementId()))) {
+        throw new WriteProcessException("Time series not registered:" + record.deviceId + dataPoint.getMeasurementId());
+      } else {
+        chunkWriters.put(path, new ChunkWriterImplV2(schema.getSeriesSchema(path)));
+      }
+    }
+
+    return true;
+  }
+
+
+  /**
+   * write a record in type of T.
+   *
+   * @param record - record responding a data line
+   * @return true -size of tsfile or metadata reaches the threshold. false - otherwise
+   * @throws IOException exception in IO
+   * @throws WriteProcessException exception in write process
+   */
+  public boolean write(TSRecord record) throws IOException, WriteProcessException {
+    // make sure the ChunkGroupWriter for this TSRecord exist
+    checkIsTimeSeriesExist(record);
+    // get corresponding ChunkGroupWriter and write this TSRecord
+
+    for (DataPoint dataPoint: record.dataPointList) {
+      dataPoint.writeTo(record.time, chunkWriters.get(new Path(record.deviceId, dataPoint.getMeasurementId())));
+    }
+
+    ++recordCount;
+    return checkMemorySizeAndMayFlushChunks();
+  }
+
+
+  /**
+   * calculate total memory size occupied by all ChunkGroupWriter instances currently.
+   *
+   * @return total memory size used
+   */
+  private long calculateMemSizeForAllGroup() {
+    int memTotalSize = 0;
+    for (ChunkWriterImplV2 chunkWriter : chunkWriters.values()) {
+      memTotalSize += chunkWriter.estimateMaxSeriesMemSize();
+    }
+    return memTotalSize;
+  }
+
+  /**
+   * check occupied memory size, if it exceeds the chunkGroupSize threshold, flush them to given
+   * OutputStream.
+   *
+   * @return true - size of tsfile or metadata reaches the threshold. false - otherwise
+   * @throws IOException exception in IO
+   */
+  private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
+    if (recordCount >= recordCountForNextMemCheck) {
+      long memSize = calculateMemSizeForAllGroup();
+      assert memSize > 0;
+      if (memSize > chunkGroupSizeThreshold) {
+        LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
+        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
+        return flushAllChunks();
+      } else {
+        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
+        return false;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * flush the data in all series writers of all chunk group writers and their page writers to
+   * outputStream.
+   *
+   * @return true - size of tsfile or metadata reaches the threshold. false - otherwise. But this
+   * function just return false, the Override of IoTDB may return true.
+   * @throws IOException exception in IO
+   */
+  private boolean flushAllChunks() throws IOException {
+    for (ChunkWriterImplV2 chunkWriter: chunkWriters.values()) {
+      chunkWriter.writeToFileWriter(fileWriter);
+    }
+    return false;
+  }
+
+
+  /**
+   * calling this method to write the last data remaining in memory and close the normal and error
+   * OutputStream.
+   *
+   * @throws IOException exception in IO
+   */
+  @Override
+  public void close() throws IOException {
+    LOG.info("start close file");
+    flushAllChunks();
+    fileWriter.endFile(this.schema);
+  }
+
+  /**
+   * this function is only for Test.
+   * @return TsFileIOWriter
+   */
+  public TsFileIOWriterV2 getIOWriter() {
+    return this.fileWriter;
+  }
+
+  /**
+   * this function is only for Test
+   * @throws IOException exception in IO
+   */
+  public void flushForTest() throws IOException {
+    flushAllChunks();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java
new file mode 100644
index 0000000..5290240
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImplV2.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.write.page.PageWriter;
+import org.apache.iotdb.tsfile.write.page.PageWriterV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriterV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChunkWriterImplV2 {
+
+  private static final Logger logger = LoggerFactory.getLogger(ChunkWriterImpl.class);
+
+  private TimeseriesSchema timeseriesSchema;
+
+  private ICompressor compressor;
+
+  /**
+   * all pages of this column.
+   */
+  private PublicBAOS pageBuffer;
+
+  private int numOfPages;
+
+  /**
+   * write data into current page
+   */
+  private PageWriterV2 pageWriter;
+
+  /**
+   * page size threshold.
+   */
+  private final long pageSizeThreshold;
+
+  private final int maxNumberOfPointsInPage;
+
+  /**
+   * value count in current page.
+   */
+  private int valueCountInOnePageForNextCheck;
+
+  // initial value for valueCountInOnePageForNextCheck
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+
+  /**
+   * statistic of this chunk.
+   */
+  private Statistics<?> chunkStatistics;
+
+  /**
+   * @param schema schema of this measurement
+   */
+  public ChunkWriterImplV2(TimeseriesSchema schema) {
+    this.timeseriesSchema = schema;
+    this.compressor = ICompressor.getCompressor(schema.getCompressionType());
+    this.pageBuffer = new PublicBAOS();
+
+    this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig()
+        .getMaxNumberOfPointsInPage();
+    // initial check of memory usage. So that we have enough data to make an initial prediction
+    this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+    // init statistics for this chunk and page
+    this.chunkStatistics = Statistics.getStatsByType(timeseriesSchema.getType());
+
+    this.pageWriter = new PageWriterV2(timeseriesSchema);
+    this.pageWriter.setTimeEncoder(timeseriesSchema.getTimeEncoder());
+    this.pageWriter.setValueEncoder(timeseriesSchema.getValueEncoder());
+  }
+
+  public void write(long time, long value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long time, int value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long time, boolean value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long time, float value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long time, double value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long time, Binary value) {
+    pageWriter.write(time, value);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, int[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, long[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, float[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, double[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
+    pageWriter.write(timestamps, values, batchSize);
+    checkPageSizeAndMayOpenANewPage();
+  }
+
+  /**
+   * check occupied memory size, if it exceeds the PageSize threshold, flush them to given
+   * OutputStream.
+   */
+  private void checkPageSizeAndMayOpenANewPage() {
+    if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+      logger.debug("current line count reaches the upper bound, write page {}", timeseriesSchema);
+      writePage();
+    } else if (pageWriter.getPointNumber()
+        >= valueCountInOnePageForNextCheck) { // need to check memory size
+      // not checking the memory used for every value
+      long currentPageSize = pageWriter.estimateMaxMemSize();
+      if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+        // we will write the current page
+        logger.debug(
+            "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+            timeseriesSchema.getMeasurementId(), pageSizeThreshold, currentPageSize,
+            pageWriter.getPointNumber());
+        writePage();
+        valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+      } else {
+        // reset the valueCountInOnePageForNextCheck for the next page
+        valueCountInOnePageForNextCheck = (int) (((float) pageSizeThreshold / currentPageSize)
+            * pageWriter.getPointNumber());
+      }
+    }
+  }
+
+  private void writePage() {
+    try {
+      pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer);
+
+      // update statistics of this chunk
+      numOfPages++;
+      this.chunkStatistics.mergeStatistics(pageWriter.getStatistics());
+    } catch (IOException e) {
+      logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+    } finally {
+      // clear start time stamp for next initializing
+      pageWriter.reset(timeseriesSchema);
+    }
+  }
+
+  public void writeToFileWriter(TsFileIOWriterV2 tsfileWriter) throws IOException {
+    // seal current page
+    if (pageWriter.getPointNumber() > 0) {
+      writePage();
+    }
+
+    writeAllPagesOfChunkToTsFile(tsfileWriter, chunkStatistics);
+
+    // reinit this chunk writer
+    pageBuffer.reset();
+    this.chunkStatistics = Statistics.getStatsByType(timeseriesSchema.getType());
+  }
+
+  public long estimateMaxSeriesMemSize() {
+    return pageWriter.estimateMaxMemSize() + this.estimateMaxPageMemSize();
+  }
+
+  public long getCurrentChunkSize() {
+    // return the serialized size of the chunk header + all pages
+    return ChunkHeader.getSerializedSize(timeseriesSchema.getMeasurementId()) + this
+        .getCurrentDataSize();
+  }
+
+  public int getNumOfPages() {
+    return numOfPages;
+  }
+
+  public TSDataType getDataType() {
+    return timeseriesSchema.getType();
+  }
+
+  /**
+   * write the page to specified IOWriter.
+   *
+   * @param writer     the specified IOWriter
+   * @param statistics the chunk statistics
+   * @throws IOException exception in IO
+   */
+  public void writeAllPagesOfChunkToTsFile(TsFileIOWriterV2 writer, Statistics<?> statistics)
+      throws IOException {
+    if (statistics.getCount() == 0) {
+      return;
+    }
+
+    // start to write this column chunk
+    writer.startFlushChunk(timeseriesSchema, compressor.getType(), timeseriesSchema.getType(),
+        timeseriesSchema.getEncodingType(), statistics, pageBuffer.size(), numOfPages);
+
+    // write all pages of this column
+    writer.writeBytesToStream(pageBuffer);
+
+    writer.endCurrentChunk();
+  }
+
+  /**
+   * estimate max page memory size.
+   *
+   * @return the max possible allocated size currently
+   */
+  private long estimateMaxPageMemSize() {
+    // return the sum of size of buffer and page max size
+    return (long) (pageBuffer.size() +
+        PageHeader.calculatePageHeaderSizeWithoutStatistics() +
+        pageWriter.getStatistics().getSerializedSize());
+  }
+
+  /**
+   * get current data size.
+   *
+   * @return current data size that the writer has serialized.
+   */
+  private long getCurrentDataSize() {
+    return pageBuffer.size();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java
new file mode 100644
index 0000000..fdacd6c
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriterV2.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.page;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This writer is used to write time-value into a page. It consists of a time encoder,
+ * a value encoder and respective OutputStream.
+ */
+public class PageWriterV2 {
+
+  private static final Logger logger = LoggerFactory.getLogger(PageWriter.class);
+
+  // time of the latest written time value pair, we assume data is written in time order
+  private long pageMaxTime;
+  private long pageMinTime = Long.MIN_VALUE;
+
+  private ICompressor compressor;
+
+  // time
+  private Encoder timeEncoder;
+  private PublicBAOS timeOut;
+  // value
+  private Encoder valueEncoder;
+  private PublicBAOS valueOut;
+
+  /**
+   * statistic of current page. It will be reset after calling {@code writePageHeaderAndDataIntoBuff()}
+   */
+  private Statistics<?> statistics;
+  private int pointNumber;
+
+  public PageWriterV2() {
+    this(null, null);
+  }
+
+  public PageWriterV2(TimeseriesSchema timeseriesSchema) {
+    this(timeseriesSchema.getTimeEncoder(), timeseriesSchema.getValueEncoder());
+    this.statistics = Statistics.getStatsByType(timeseriesSchema.getType());
+    this.compressor = ICompressor.getCompressor(timeseriesSchema.getCompressionType());
+  }
+
+  private PageWriterV2(Encoder timeEncoder, Encoder valueEncoder) {
+    this.timeOut = new PublicBAOS();
+    this.valueOut = new PublicBAOS();
+    this.timeEncoder = timeEncoder;
+    this.valueEncoder = valueEncoder;
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, boolean value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, short value) {
+    ++pointNumber;
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, int value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, long value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, float value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, double value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write a time value pair into encoder
+   */
+  public void write(long time, Binary value) {
+    ++pointNumber;
+    this.pageMaxTime = time;
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = time;
+    }
+    timeEncoder.encode(time, timeOut);
+    valueEncoder.encode(value, valueOut);
+    statistics.update(time, value);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, int[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, long[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, float[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, double[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * write time series into encoder
+   */
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
+    pointNumber += batchSize;
+    this.pageMaxTime = timestamps[batchSize - 1];
+    if (pageMinTime == Long.MIN_VALUE) {
+      pageMinTime = timestamps[0];
+    }
+    for (int i = 0; i < batchSize; i++) {
+      timeEncoder.encode(timestamps[i], timeOut);
+      valueEncoder.encode(values[i], valueOut);
+    }
+    statistics.update(timestamps, values, batchSize);
+  }
+
+  /**
+   * flush all data remained in encoders.
+   */
+  private void prepareEndWriteOnePage() throws IOException {
+    timeEncoder.flush(timeOut);
+    valueEncoder.flush(valueOut);
+  }
+
+  /**
+   * getUncompressedBytes return data what it has been written in form of
+   * <code>size of time list, time list, value list</code>
+   *
+   * @return a new readable ByteBuffer whose position is 0.
+   */
+  public ByteBuffer getUncompressedBytes() throws IOException {
+    prepareEndWriteOnePage();
+    ByteBuffer buffer = ByteBuffer.allocate(timeOut.size() + valueOut.size() + 4);
+    ReadWriteForEncodingUtils.writeUnsignedVarInt(timeOut.size(), buffer);
+    buffer.put(timeOut.getBuf(), 0, timeOut.size());
+    buffer.put(valueOut.getBuf(), 0, valueOut.size());
+    buffer.flip();
+    return buffer;
+  }
+
+  public long getPageMaxTime() {
+    return pageMaxTime;
+  }
+
+  public long getPageMinTime() {
+    return pageMinTime;
+  }
+
+  /**
+   * write the page header and data into the PageWriter's output stream.
+   */
+  public void writePageHeaderAndDataIntoBuff(PublicBAOS pageBuffer) throws IOException {
+    if (pointNumber == 0) {
+      return;
+    }
+
+    ByteBuffer pageData = getUncompressedBytes();
+    int uncompressedSize = pageData.remaining();
+    int compressedSize;
+    int compressedPosition = 0;
+    byte[] compressedBytes = null;
+
+    if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+      compressedSize = pageData.remaining();
+    } else {
+      compressedBytes = new byte[compressor.getMaxBytesForCompression(uncompressedSize)];
+      compressedPosition = 0;
+      // data is never a directByteBuffer now, so we can use data.array()
+      compressedSize = compressor
+          .compress(pageData.array(), pageData.position(), uncompressedSize, compressedBytes);
+    }
+
+    // write the page header to IOWriter
+    PageHeader header = new PageHeader(uncompressedSize, compressedSize, pointNumber, statistics,
+        pageMaxTime, pageMinTime);
+    header.serializeTo(pageBuffer);
+
+    // write page content to temp PBAOS
+    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+      logger.debug("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
+      if (compressor.getType().equals(CompressionType.UNCOMPRESSED)) {
+        channel.write(pageData);
+      } else {
+        pageBuffer.write(compressedBytes, compressedPosition, compressedSize);
+      }
+      logger.debug("start to flush a page data into buffer, buffer position {} ", pageBuffer.size());
+    }
+  }
+
+  /**
+   * calculate max possible memory size it occupies, including time outputStream and value outputStream, because size
+   * outputStream is never used until flushing.
+   *
+   * @return allocated size in time, value and outputStream
+   */
+  public long estimateMaxMemSize() {
+    return timeOut.size() + valueOut.size() + timeEncoder.getMaxByteSize() + valueEncoder
+        .getMaxByteSize();
+  }
+
+  /**
+   * reset this page
+   */
+  public void reset(TimeseriesSchema measurementSchema) {
+    timeOut.reset();
+    valueOut.reset();
+    pointNumber =0;
+    pageMinTime = Long.MIN_VALUE;
+    pageMaxTime = Long.MIN_VALUE;
+    statistics = Statistics.getStatsByType(measurementSchema.getType());
+  }
+
+  public void setTimeEncoder(Encoder encoder) {
+    this.timeEncoder = encoder;
+  }
+
+  public void setValueEncoder(Encoder encoder) {
+    this.valueEncoder = encoder;
+  }
+
+  public void initStatistics(TSDataType dataType) {
+    statistics = Statistics.getStatsByType(dataType);
+  }
+
+  public int getPointNumber(){
+    return pointNumber;
+  }
+
+  public Statistics<?> getStatistics(){
+    return statistics;
+  }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
index 7fd5c19..21a5465 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/BooleanDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,15 @@ public class BooleanDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
index 0d0f057..5ac7286 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.StringContainer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 
 import java.io.IOException;
@@ -107,6 +108,8 @@ public abstract class DataPoint {
    */
   public abstract void writeTo(long time, IChunkWriter writer) throws IOException;
 
+  public abstract void writeTo(long time, ChunkWriterImplV2 writer) throws IOException;
+
   public String getMeasurementId() {
     return measurementId;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
index de6eb0f..dbeb756 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DoubleDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,15 @@ public class DoubleDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
index af3d919..d613f20 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/FloatDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,15 @@ public class FloatDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
index 38cbfd0..8e695bf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/IntDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,15 @@ public class IntDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
index 2258816..fedd6cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/LongDataPoint.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,15 @@ public class LongDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
index 047a724..c741a10 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/StringDataPoint.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.write.record.datapoint;
 import java.io.IOException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImplV2;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +58,15 @@ public class StringDataPoint extends DataPoint {
   }
 
   @Override
+  public void writeTo(long time, ChunkWriterImplV2 writer) throws IOException {
+    if (writer == null) {
+      LOG.warn("given IChunkWriter is null, do nothing and return");
+      return;
+    }
+    writer.write(time, value);
+  }
+
+  @Override
   public Object getValue() {
     return value;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
index 60c3d0b..8a89ab9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/SchemaV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.write.schemaV2;
 
+import java.util.LinkedHashMap;
 import org.apache.iotdb.tsfile.read.common.Path;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,13 @@ public class SchemaV2 {
   private Map<Path, TimeseriesSchema> timeseriesSchemaMap;
 
   /**
+   * template name -> (measuremnet -> TimeseriesSchema)
+   */
+  private Map<String, Map<String, TimeseriesSchema>> deviceTemplates;
+
+  private Map<String> devices;
+
+  /**
    * register a measurement schema map.
    */
   public void registerTimeseries(Path path, TimeseriesSchema descriptor) {
@@ -57,10 +65,18 @@ public class SchemaV2 {
     return timeseriesSchemaMap.get(path);
   }
 
+  public boolean containsDevice(String device) {
+    return devices.containsKey(device);
+  }
+
+  public Map<Path, TimeseriesSchema> getTimeseriesSchemaMap() {
+    return timeseriesSchemaMap;
+  }
+
   /**
    * check if this schema contains a measurement named measurementId.
    */
-  public boolean hasTimeseries(Path path) {
+  public boolean containsTimeseries(Path path) {
     return timeseriesSchemaMap.containsKey(path);
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
index 67b3559..a4a01a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schemaV2/TimeseriesSchema.java
@@ -163,71 +163,4 @@ public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializa
     return compressionType;
   }
 
-  /**
-   * function for serializing data to output stream.
-   */
-  public int serializeTo(OutputStream outputStream) throws IOException {
-    int byteLen = 0;
-
-    byteLen += ReadWriteIOUtils.write(measurementId, outputStream);
-
-    byteLen += ReadWriteIOUtils.write(type, outputStream);
-
-    byteLen += ReadWriteIOUtils.write(encoding, outputStream);
-
-    byteLen += ReadWriteIOUtils.write(compressionType, outputStream);
-
-    if (props == null) {
-      byteLen += ReadWriteIOUtils.write(0, outputStream);
-    } else {
-      byteLen += ReadWriteIOUtils.write(props.size(), outputStream);
-      for (Map.Entry<String, String> entry : props.entrySet()) {
-        byteLen += ReadWriteIOUtils.write(entry.getKey(), outputStream);
-        byteLen += ReadWriteIOUtils.write(entry.getValue(), outputStream);
-      }
-    }
-
-    return byteLen;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    TimeseriesSchema that = (TimeseriesSchema) o;
-    return type == that.type && encoding == that.encoding && Objects
-        .equals(measurementId, that.measurementId)
-        && Objects.equals(compressionType, that.compressionType);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(type, encoding, measurementId, compressionType);
-  }
-
-  /**
-   * compare by measurementID.
-   */
-  @Override
-  public int compareTo(TimeseriesSchema o) {
-    if (equals(o)) {
-      return 0;
-    } else {
-      return this.measurementId.compareTo(o.measurementId);
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringContainer sc = new StringContainer("");
-    sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",",
-        props.toString(), ",",
-        compressionType.toString());
-    sc.addTail("]");
-    return sc.toString();
-  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java
new file mode 100644
index 0000000..56db877
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterV2.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.file.metadataV2.TsFileMetadataV2;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schemaV2.SchemaV2;
+import org.apache.iotdb.tsfile.write.schemaV2.TimeseriesSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TSFileIOWriter is used to construct metadata and write data stored in memory to output stream.
+ */
+public class TsFileIOWriterV2 {
+
+  public static final byte[] magicStringBytes;
+  public static final byte[] versionNumberBytes;
+  protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(TsFileIOWriter.class);
+
+  static {
+    magicStringBytes = BytesUtils.stringToBytes(TSFileConfig.MAGIC_STRING);
+    versionNumberBytes = TSFileConfig.VERSION_NUMBER.getBytes();
+  }
+
+  protected TsFileOutput out;
+  protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
+  protected boolean canWrite = true;
+  protected int totalChunkNum = 0;
+  protected int invalidChunkNum;
+  protected File file;
+  private List<ChunkMetaData> chunkMetaDataList;
+  private ChunkMetaData currentChunkMetaData;
+  private long markedPosition;
+
+  /**
+   * empty construct function.
+   */
+  protected TsFileIOWriterV2() {
+
+  }
+
+  /**
+   * for writing a new tsfile.
+   *
+   * @param file be used to output written data
+   * @throws IOException if I/O error occurs
+   */
+  public TsFileIOWriterV2(File file) throws IOException {
+    this.out = new DefaultTsFileOutput(file);
+    startFile();
+  }
+
+  /**
+   * for writing a new tsfile.
+   *
+   * @param output be used to output written data
+   */
+  public TsFileIOWriterV2(TsFileOutput output) throws IOException {
+    this.out = output;
+    startFile();
+  }
+
+
+  /**
+   * Writes given bytes to output stream. This method is called when total memory size exceeds the
+   * chunk group size threshold.
+   *
+   * @param bytes - data of several pages which has been packed
+   * @throws IOException if an I/O error occurs.
+   */
+  public void writeBytesToStream(PublicBAOS bytes) throws IOException {
+    bytes.writeTo(out.wrapAsStream());
+  }
+
+  protected void startFile() throws IOException {
+    out.write(magicStringBytes);
+    out.write(versionNumberBytes);
+  }
+
+  /**
+   * start a {@linkplain ChunkMetaData ChunkMetaData}.
+   *
+   * @param timeseriesSchema - schema of this time series
+   * @param compressionCodecName - compression name of this time series
+   * @param tsDataType - data type
+   * @param statistics - Chunk statistics
+   * @param dataSize - the serialized size of all pages
+   * @throws IOException if I/O error occurs
+   */
+  public void startFlushChunk(TimeseriesSchema timeseriesSchema, CompressionType compressionCodecName,
+      TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics,
+      int dataSize, int numOfPages) throws IOException {
+
+    currentChunkMetaData = new ChunkMetaData(timeseriesSchema.getMeasurementId(), tsDataType,
+        out.getPosition(), statistics);
+
+    // flush ChunkHeader to TsFileIOWriter
+    if (logger.isDebugEnabled()) {
+      logger.debug("start series chunk:{}, file position {}", timeseriesSchema, out.getPosition());
+    }
+
+    ChunkHeader header = new ChunkHeader(timeseriesSchema.getMeasurementId(), dataSize, tsDataType,
+        compressionCodecName, encodingType, numOfPages);
+    header.serializeTo(out.wrapAsStream());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("finish series chunk:{} header, file position {}", header, out.getPosition());
+    }
+  }
+
+  /**
+   * Write a whole chunk in another file into this file. Providing fast merge for IoTDB.
+   */
+  public void writeChunk(Chunk chunk, ChunkMetaData chunkMetadata) throws IOException {
+    ChunkHeader chunkHeader = chunk.getHeader();
+    currentChunkMetaData = new ChunkMetaData(chunkHeader.getMeasurementID(),
+        chunkHeader.getDataType(), out.getPosition(), chunkMetadata.getStatistics());
+    chunkHeader.serializeTo(out.wrapAsStream());
+    out.write(chunk.getData());
+    endCurrentChunk();
+    logger.debug("end flushing a chunk:{}, totalvalue:{}", currentChunkMetaData, chunkMetadata.getNumOfPoints());
+  }
+
+  /**
+   * end chunk and write some log.
+   */
+  public void endCurrentChunk() {
+    chunkMetaDataList.add(currentChunkMetaData);
+    currentChunkMetaData = null;
+    totalChunkNum++;
+  }
+
+  /**
+   * write {@linkplain TsFileMetaData TSFileMetaData} to output stream and close it.
+   *
+   * @param schema Schema
+   * @throws IOException if I/O error occurs
+   */
+  public void endFile(SchemaV2 schema) throws IOException {
+
+    // serialize the SEPARATOR of MetaData and ChunkGroups
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+    // get all measurementSchema of this TsFile
+    Map<Path, TimeseriesSchema> schemaDescriptors = schema.getTimeseriesSchemaMap();
+    logger.debug("get time series list:{}", schemaDescriptors);
+
+    long[] tsOffsets = flushAllChunkMetadataList(this.chunkMetaDataList);
+
+    TsFileMetadataV2 tsFileMetaData = new TsFileMetadataV2(tsOffsets);
+
+    tsFileMetaData.setTotalChunkNum(totalChunkNum);
+    tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
+
+    long footerIndex = out.getPosition();
+    logger.debug("start to flush the footer,file pos:{}", footerIndex);
+
+    // write TsFileMetaData
+    int size = tsFileMetaData.serializeTo(out.wrapAsStream());
+    if (logger.isDebugEnabled()) {
+      logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
+    }
+
+    // write bloom filter
+    size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkGroupMetaDataList);
+    if (logger.isDebugEnabled()) {
+      logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
+    }
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());// write the size of the file metadata.
+
+    // write magic string
+    out.write(magicStringBytes);
+
+    // close file
+    out.close();
+    canWrite = false;
+    logger.info("output stream is closed");
+  }
+
+  /**
+   * 1. group chunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
+   * TsDeviceMetadataIndex
+   *
+   * @param chunkGroupMetaDataList all chunk group metadata in memory
+   * @return TsDeviceMetadataIndex in TsFileMetaData
+   */
+  private long[] flushAllChunkMetadataList(
+      List<ChunkGroupMetaData> chunkGroupMetaDataList) throws IOException {
+
+
+    // convert ChunkMetadataList to this field
+    Map<Path, List<ChunkMetaData>> timeseriesMetadataList= new TreeMap<>();
+
+    // flush timeseriesMetadataList one by one
+
+
+    // return long[]
+
+    return null;
+  }
+
+  /**
+   * group all chunk group metadata by device.
+   *
+   * @param chunkMetaDataList all chunk metadata
+   * @return ChunkMetadata list of each timeseries
+   */
+  private TreeMap<Path, List<ChunkMetaData>> getAllTsMetadata(
+      List<ChunkMetaData> chunkMetaDataList) {
+    TreeMap<Path, List<ChunkMetaData>> timeseriesMetadataList = new TreeMap<>();
+
+    for (ChunkMetaData chunkMetaData: chunkMetaDataList) {
+
+    }
+    return timeseriesMetadataList;
+  }
+
+  /**
+   * get the length of normal OutputStream.
+   *
+   * @return - length of normal OutputStream
+   * @throws IOException if I/O error occurs
+   */
+  public long getPos() throws IOException {
+    return out.getPosition();
+  }
+
+
+  /**
+   * get chunkGroupMetaDataList.
+   *
+   * @return - List of chunkGroupMetaData
+   */
+  public List<ChunkGroupMetaData> getChunkGroupMetaDatas() {
+    return chunkGroupMetaDataList;
+  }
+
+  public boolean canWrite() {
+    return canWrite;
+  }
+
+  public void mark() throws IOException {
+    markedPosition = getPos();
+  }
+
+  public void reset() throws IOException {
+    out.truncate(markedPosition);
+  }
+
+  /**
+   * close the outputStream or file channel without writing FileMetadata. This is just used for
+   * Testing.
+   */
+  public void close() throws IOException {
+    canWrite = false;
+    out.close();
+  }
+
+  void writeSeparatorMaskForTest() throws IOException {
+    out.write(new byte[]{MetaMarker.SEPARATOR});
+  }
+
+  void writeChunkMaskForTest() throws IOException {
+    out.write(new byte[]{MetaMarker.CHUNK_HEADER});
+  }
+
+
+  public int getTotalChunkNum() {
+    return totalChunkNum;
+  }
+
+  public int getInvalidChunkNum() {
+    return invalidChunkNum;
+  }
+
+  public File getFile() {
+    return file;
+  }
+
+  /**
+   * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
+   */
+  public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
+    Map<Path, Integer> startTimeIdxes = new HashMap<>();
+    chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+
+    Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
+    while (chunkGroupMetaDataIterator.hasNext()) {
+      ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+      String deviceId = chunkGroupMetaData.getDeviceID();
+      int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
+      Iterator<ChunkMetaData> chunkMetaDataIterator =
+          chunkGroupMetaData.getChunkMetaDataList().iterator();
+      while (chunkMetaDataIterator.hasNext()) {
+        ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
+        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+        int startTimeIdx = startTimeIdxes.get(path);
+
+        List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
+        boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
+            && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
+        if (!chunkValid) {
+          chunkMetaDataIterator.remove();
+          chunkNum--;
+          invalidChunkNum++;
+        } else {
+          startTimeIdxes.put(path, startTimeIdx + 1);
+        }
+      }
+      if (chunkNum == 0) {
+        chunkGroupMetaDataIterator.remove();
+      }
+    }
+  }
+
+  /**
+   * this function is only for Test.
+   *
+   * @return TsFileOutput
+   */
+  public TsFileOutput getIOWriterOut() {
+    return this.out;
+  }
+}