You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/04 09:45:31 UTC

[iotdb] branch IOTDB-4517 created (now b873fed67b)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4517
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b873fed67b control metadata size in TsFileIOWriter

This branch includes the following new commits:

     new b873fed67b control metadata size in TsFileIOWriter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: control metadata size in TsFileIOWriter

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4517
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b873fed67b30b0c4b1e9185b9428fb962e34e32c
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Oct 4 17:45:11 2022 +0800

    control metadata size in TsFileIOWriter
---
 .../resources/conf/iotdb-engine.properties         |    8 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   21 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   10 +
 .../db/engine/compaction/CompactionUtils.java      |   79 +-
 .../cross/rewrite/task/SubCompactionTask.java      |    5 +-
 .../utils/AlignedSeriesCompactionExecutor.java     |    1 +
 .../inner/utils/InnerSpaceCompactionUtils.java     |   12 +-
 .../utils/SingleSeriesCompactionExecutor.java      |    1 +
 .../writer/AbstractCompactionWriter.java           |    7 +
 .../writer/CrossSpaceCompactionWriter.java         |   47 +
 .../writer/InnerSpaceCompactionWriter.java         |   34 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   16 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |    9 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |    2 +-
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |    2 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |   12 +-
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |    8 +
 .../file/metadata/MetadataIndexConstructor.java    |   39 +-
 .../tsfile/file/metadata/MetadataIndexNode.java    |    2 +-
 .../iotdb/tsfile/file/metadata/TsFileMetadata.java |   13 +
 .../tsfile/file/metadata/enums/TSDataType.java     |    2 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |    5 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |   20 +-
 .../write/writer/RestorableTsFileIOWriter.java     |   12 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  359 ++++--
 .../write/writer/tsmiterator/DiskTSMIterator.java  |  132 ++
 .../write/writer/tsmiterator/TSMIterator.java      |  147 +++
 .../tsfile/write/MetadataIndexConstructorTest.java |    2 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |    2 +-
 .../tsfile/write/TsFileIntegrityCheckingTool.java  |  251 ++++
 .../writer/TsFileIOWriterMemoryControlTest.java    | 1303 ++++++++++++++++++++
 31 files changed, 2411 insertions(+), 152 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e17a900a25..9f494b582c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -353,6 +353,10 @@ timestamp_precision=ms
 # Datatype: int
 # primitive_array_size=32
 
+# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
+# Datatype: double
+# chunk_metadata_size_proportion_in_write=0.1
+
 # Ratio of write memory for invoking flush disk, 0.4 by default
 # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
 # Datatype: double
@@ -449,6 +453,10 @@ timestamp_precision=ms
 # BALANCE: alternate two compaction types
 # compaction_priority=BALANCE
 
+# size proportion for chunk metadata maintains in memory when compacting
+# Datatype: double
+# chunk_metadata_size_proportion_in_compaction=0.05
+
 # The target tsfile size in compaction
 # Datatype: long, Unit: byte
 # target_compaction_file_size=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f4cfc65920..c9151ffcf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,6 +128,8 @@ public class IoTDBConfig {
   /** The proportion of write memory for write process */
   private double writeProportion = 0.8;
 
+  private double chunkMetadataSizeProportionInWrite = 0.1;
+
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
@@ -398,6 +400,8 @@ public class IoTDBConfig {
    */
   private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
 
+  private double chunkMetadataSizeProportionInCompaction = 0.05;
+
   /** The target tsfile size in compaction, 1 GB by default */
   private long targetCompactionFileSize = 1073741824L;
 
@@ -2773,4 +2777,21 @@ public class IoTDBConfig {
   public void setCustomizedProperties(Properties customizedProperties) {
     this.customizedProperties = customizedProperties;
   }
+
+  public double getChunkMetadataSizeProportionInWrite() {
+    return chunkMetadataSizeProportionInWrite;
+  }
+
+  public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
+    this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
+  }
+
+  public double getChunkMetadataSizeProportionInCompaction() {
+    return chunkMetadataSizeProportionInCompaction;
+  }
+
+  public void setChunkMetadataSizeProportionInCompaction(
+      double chunkMetadataSizeProportionInCompaction) {
+    this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 815113f6b4..01606eb873 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -341,6 +341,11 @@ public class IoTDBDescriptor {
                 "max_waiting_time_when_insert_blocked",
                 Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
 
+    conf.setChunkMetadataSizeProportionInCompaction(
+        Double.parseDouble(
+            properties.getProperty(
+                "chunk_metadata_size_proportion_in_compaction",
+                Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
     conf.setEstimatedSeriesSize(
         Integer.parseInt(
             properties.getProperty(
@@ -928,6 +933,11 @@ public class IoTDBDescriptor {
         .setKerberosPrincipal(
             properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
     TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+    conf.setChunkMetadataSizeProportionInWrite(
+        Double.parseDouble(
+            properties.getProperty(
+                "chunk_metadata_size_proportion_in_write",
+                Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));
 
     // timed flush memtable, timed close tsfile
     loadTimedService(properties);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 97bcff78ff..e32930864e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -60,7 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -165,48 +165,51 @@ public class CompactionUtils {
       QueryContext queryContext,
       QueryDataSource queryDataSource)
       throws IOException, InterruptedException {
-    Map<String, MeasurementSchema> measurementSchemaMap =
-        deviceIterator.getAllSchemasOfCurrentDevice();
-    int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
-
-    // assign all measurements to different sub tasks
-    Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
-    int idx = 0;
-    for (String measurement : measurementSchemaMap.keySet()) {
-      if (measurementsForEachSubTask[idx % subTaskNums] == null) {
-        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
+    Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice();
+    List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
+    allMeasurements.sort((String::compareTo));
+    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+    // construct sub tasks and start compacting measurements in parallel
+    if (subTaskNums > 0) {
+      // assign the measurements for each subtask
+      List<String>[] measurementListArray = new List[subTaskNums];
+      for (int i = 0, size = allMeasurements.size(); i < size; ++i) {
+        int index = i % subTaskNums;
+        if (measurementListArray[index] == null) {
+          measurementListArray[index] = new LinkedList<>();
+        }
+        measurementListArray[index].add(allMeasurements.get(i));
       }
-      measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
-    }
 
-    // construct sub tasks and start compacting measurements in parallel
-    List<Future<Void>> futures = new ArrayList<>();
-    compactionWriter.startChunkGroup(device, false);
-    for (int i = 0; i < subTaskNums; i++) {
-      futures.add(
-          CompactionTaskManager.getInstance()
-              .submitSubTask(
-                  new SubCompactionTask(
-                      device,
-                      measurementsForEachSubTask[i],
-                      queryContext,
-                      queryDataSource,
-                      compactionWriter,
-                      measurementSchemaMap,
-                      i)));
-    }
+      // construct sub tasks and start compacting measurements in parallel
+      List<Future<Void>> futures = new ArrayList<>();
+      compactionWriter.startChunkGroup(device, false);
+      for (int i = 0; i < subTaskNums; i++) {
+        futures.add(
+            CompactionTaskManager.getInstance()
+                .submitSubTask(
+                    new SubCompactionTask(
+                        device,
+                        measurementListArray[i],
+                        queryContext,
+                        queryDataSource,
+                        compactionWriter,
+                        schemaMap,
+                        i)));
+      }
 
-    // wait for all sub tasks finish
-    for (int i = 0; i < subTaskNums; i++) {
-      try {
-        futures.get(i).get();
-      } catch (InterruptedException | ExecutionException e) {
-        logger.error("SubCompactionTask meet errors ", e);
-        Thread.interrupted();
-        throw new InterruptedException();
+      // wait for all sub tasks finish
+      for (int i = 0; i < subTaskNums; i++) {
+        try {
+          futures.get(i).get();
+        } catch (InterruptedException | ExecutionException e) {
+          logger.error("SubCompactionTask meet errors ", e);
+          Thread.interrupted();
+          throw new InterruptedException();
+        }
       }
     }
-
+    compactionWriter.checkAndMayFlushChunkMetadata();
     compactionWriter.endChunkGroup();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index c688deb8bd..f5d5278437 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 
 /**
@@ -45,7 +44,7 @@ public class SubCompactionTask implements Callable<Void> {
   private static final Logger logger =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private final String device;
-  private final Set<String> measurementList;
+  private final List<String> measurementList;
 
   private final QueryContext queryContext;
   private final QueryDataSource queryDataSource;
@@ -57,7 +56,7 @@ public class SubCompactionTask implements Callable<Void> {
 
   public SubCompactionTask(
       String device,
-      Set<String> measurementList,
+      List<String> measurementList,
       QueryContext queryContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index af5353153f..dbd5b98bec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -135,6 +135,7 @@ public class AlignedSeriesCompactionExecutor {
           chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
     }
+    writer.checkMetadataSizeAndMayFlush();
   }
 
   private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 06004c9c6e..a6248e9835 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -64,8 +65,17 @@ public class InnerSpaceCompactionUtils {
   public static void compact(TsFileResource targetResource, List<TsFileResource> tsFileResources)
       throws IOException, MetadataException, InterruptedException {
 
+    // size for file writer is 5% of per compaction task memory budget
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
     try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(tsFileResources);
-        TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
+        TsFileIOWriter writer =
+            new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
       while (deviceIterator.hasNextDevice()) {
         Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
         String device = deviceInfo.left;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..d614b3dbe3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -138,6 +138,7 @@ public class SingleSeriesCompactionExecutor {
     }
     targetResource.updateStartTime(device, minStartTimestamp);
     targetResource.updateEndTime(device, maxEndTimestamp);
+    fileWriter.checkMetadataSizeAndMayFlush();
   }
 
   private void constructChunkWriterFromReadChunk(Chunk chunk) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 5c1460230d..72096069e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -177,4 +177,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   }
 
   public abstract List<TsFileIOWriter> getFileIOWriter();
+
+  public void checkAndMayFlushChunkMetadata() throws IOException {
+    List<TsFileIOWriter> writers = this.getFileIOWriter();
+    for (TsFileIOWriter writer : writers) {
+      writer.checkMetadataSizeAndMayFlush();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 3e245cfc35..3a413d4cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -27,6 +29,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // target fileIOWriters
@@ -34,6 +38,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
 
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
+  private List<TsFileResource> targetTsFileResources;
 
   // Each sub task has its corresponding seq file index.
   // The index of the array corresponds to subTaskId.
@@ -51,17 +56,46 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // current chunk group header size
   private int chunkGroupHeaderSize;
 
+  private AtomicLong[] startTimeForCurDeviceForEachFile;
+  private AtomicLong[] endTimeForCurDeviceForEachFile;
+  private AtomicBoolean[] hasCurDeviceForEachFile;
+  private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][];
+  private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][];
+
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
       throws IOException {
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
     isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    this.targetTsFileResources = targetResources;
+    startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()];
+    long memorySizeForEachWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction()
+                / targetResources.size());
     for (int i = 0; i < targetResources.size(); i++) {
       this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
       isEmptyFile[i] = true;
+      startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE);
+      hasCurDeviceForEachFile[i] = new AtomicBoolean(false);
     }
     this.seqTsFileResources = seqFileResources;
+    for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) {
+      startTimeForEachDevice[i] = new AtomicLong[size];
+      endTimeForEachDevice[i] = new AtomicLong[size];
+      for (int j = 0; j < size; ++j) {
+        startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE);
+        endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE);
+      }
+    }
   }
 
   @Override
@@ -86,6 +120,16 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
       }
       isDeviceExistedInTargetFiles[i] = false;
     }
+    for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) {
+      for (int j = 0; j < subTaskNum; ++j) {
+        targetTsFileResources
+            .get(i)
+            .updateStartTime(deviceId, startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE));
+        targetTsFileResources
+            .get(i)
+            .updateEndTime(deviceId, endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE));
+      }
+    }
     deviceId = null;
   }
 
@@ -99,6 +143,9 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   public void write(long timestamp, Object value, int subTaskId) throws IOException {
     checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
     writeDataPoint(timestamp, value, subTaskId);
+    int fileIndex = seqFileIndexArray[subTaskId];
+    startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::min);
+    endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::max);
     checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
     isEmptyFile[seqFileIndexArray[subTaskId]] = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index af2cc53c67..18fa51d7a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -18,21 +18,43 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   private TsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
+  private TsFileResource resource;
+  private AtomicLong[] startTimeOfCurDevice;
+  private AtomicLong[] endTimeOfCurDevice;
 
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
-    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
+    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter);
     isEmptyFile = true;
+    resource = targetFileResource;
+    int concurrentThreadNum =
+        Math.max(1, IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+    startTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    endTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    for (int i = 0; i < concurrentThreadNum; ++i) {
+      startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE);
+    }
   }
 
   @Override
@@ -44,6 +66,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
 
   @Override
   public void endChunkGroup() throws IOException {
+    for (int i = 0; i < startTimeOfCurDevice.length; ++i) {
+      resource.updateStartTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), startTimeOfCurDevice[i].get());
+      resource.updateEndTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), endTimeOfCurDevice[i].get());
+      startTimeOfCurDevice[i].set(Long.MAX_VALUE);
+      endTimeOfCurDevice[i].set(Long.MIN_VALUE);
+    }
     fileWriter.endChunkGroup();
   }
 
@@ -57,6 +87,8 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
     writeDataPoint(timestamp, value, subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
     isEmptyFile = false;
+    startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(), timestamp));
+    endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(), timestamp));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 2fd5f4db64..5ab620f7fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -39,6 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -114,12 +117,14 @@ public class MemTableFlushTask {
     long start = System.currentTimeMillis();
     long sortTime = 0;
 
-    // for map do not use get(key) to iterate
-    for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry :
-        memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID()));
+    Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+    List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+    // sort the IDeviceID in lexicographical order
+    deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+    for (IDeviceID deviceID : deviceIDList) {
+      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID()));
 
-      final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
+      final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
@@ -275,6 +280,7 @@ public class MemTableFlushTask {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
+              writer.checkMetadataSizeAndMayFlush();
             } else {
               ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1cb31057e5..a56061935a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -172,7 +172,14 @@ public class TsFileProcessor {
     this.storageGroupName = storageGroupName;
     this.tsFileResource = new TsFileResource(tsfile, this);
     this.storageGroupInfo = storageGroupInfo;
-    this.writer = new RestorableTsFileIOWriter(tsfile);
+    this.writer =
+        new RestorableTsFileIOWriter(
+            tsfile,
+            (long)
+                (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                    * IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getChunkMetadataSizeProportionInWrite()));
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
     logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 8c26ec903c..6ec8e69fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1423,7 +1423,7 @@ public class PlanExecutor implements IPlanExecutor {
   private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
       throws MetadataException, QueryProcessException, IOException {
     Map<String, List<TimeseriesMetadata>> metadataSet =
-        tsFileSequenceReader.getAllTimeseriesMetadata();
+        tsFileSequenceReader.getAllTimeseriesMetadata(false);
     for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
       String deviceId = entry.getKey();
       PartialPath devicePath = new PartialPath(deviceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 5d61e6c304..6273add29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -69,7 +69,7 @@ public class FileLoaderUtils {
   public static void updateTsFileResource(
       TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException {
     for (Entry<String, List<TimeseriesMetadata>> entry :
-        reader.getAllTimeseriesMetadata().entrySet()) {
+        reader.getAllTimeseriesMetadata(false).entrySet()) {
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
         tsFileResource.updateStartTime(
             entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b7722b67dd..dd87894e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.writelog.recover;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -113,7 +114,14 @@ public class TsFileRecoverPerformer {
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
     try {
-      restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+      restorableTsFileIOWriter =
+          new RestorableTsFileIOWriter(
+              file,
+              (long)
+                  (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                      * IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getChunkMetadataSizeProportionInWrite()));
     } catch (NotCompatibleTsFileException e) {
       boolean result = file.delete();
       logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
@@ -180,7 +188,7 @@ public class TsFileRecoverPerformer {
     try (TsFileSequenceReader reader =
         new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) {
       for (Entry<String, List<TimeseriesMetadata>> entry :
-          reader.getAllTimeseriesMetadata().entrySet()) {
+          reader.getAllTimeseriesMetadata(false).entrySet()) {
         for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
           tsFileResource.updateStartTime(
               entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..9ee1f7f566 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata {
     return chunkMetaData;
   }
 
+  public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+    ChunkMetadata chunkMetadata = new ChunkMetadata();
+    chunkMetadata.tsDataType = dataType;
+    chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+    chunkMetadata.statistics = Statistics.deserialize(buffer, dataType);
+    return chunkMetadata;
+  }
+
   @Override
   public long getVersion() {
     return version;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..baa93ce9da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -123,7 +123,7 @@ public class MetadataIndexConstructor {
    * @param out tsfile output
    * @param type MetadataIndexNode type
    */
-  private static MetadataIndexNode generateRootNode(
+  public static MetadataIndexNode generateRootNode(
       Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
       throws IOException {
     int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +148,7 @@ public class MetadataIndexConstructor {
     return metadataIndexNodeQueue.poll();
   }
 
-  private static void addCurrentIndexNodeToQueue(
+  public static void addCurrentIndexNodeToQueue(
       MetadataIndexNode currentIndexNode,
       Queue<MetadataIndexNode> metadataIndexNodeQueue,
       TsFileOutput out)
@@ -156,4 +156,39 @@ public class MetadataIndexConstructor {
     currentIndexNode.setEndOffset(out.getPosition());
     metadataIndexNodeQueue.add(currentIndexNode);
   }
+
+  public static MetadataIndexNode checkAndBuildLevelIndex(
+      Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException {
+    // if not exceed the max child nodes num, ignore the device index and directly point to the
+    // measurement
+    if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
+      MetadataIndexNode metadataIndexNode =
+          new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+        metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+        entry.getValue().serializeTo(out.wrapAsStream());
+      }
+      metadataIndexNode.setEndOffset(out.getPosition());
+      return metadataIndexNode;
+    }
+
+    // else, build level index for devices
+    Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
+    MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+
+    for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+      // when constructing from internal node, each node is related to an entry
+      if (currentIndexNode.isFull()) {
+        addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+        currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+      }
+      currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+      entry.getValue().serializeTo(out.wrapAsStream());
+    }
+    addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+    MetadataIndexNode deviceMetadataIndexNode =
+        generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
+    deviceMetadataIndexNode.setEndOffset(out.getPosition());
+    return deviceMetadataIndexNode;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
     this.children.add(metadataIndexEntry);
   }
 
-  boolean isFull() {
+  public boolean isFull() {
     return children.size() >= config.getMaxDegreeOfIndexNode();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..7b70f54ebe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -119,6 +119,19 @@ public class TsFileMetadata {
     return byteLen;
   }
 
+  public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+      throws IOException {
+    int byteLen = 0;
+    byte[] bytes = filter.serialize();
+    byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
+    outputStream.write(bytes);
+    byteLen += bytes.length;
+    byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getSize(), outputStream);
+    byteLen +=
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getHashFunctionSize(), outputStream);
+    return byteLen;
+  }
+
   /**
    * build bloom filter
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 48d9b2a38c..73ae05703c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -62,7 +62,7 @@ public enum TSDataType {
     return getTsDataType(type);
   }
 
-  private static TSDataType getTsDataType(byte type) {
+  public static TSDataType getTsDataType(byte type) {
     switch (type) {
       case 0:
         return TSDataType.BOOLEAN;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 90ec11349f..3a413b2f98 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -929,7 +929,8 @@ public class TsFileSequenceReader implements AutoCloseable {
   }
 
   /* TimeseriesMetadata don't need deserialize chunk metadata list */
-  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+      throws IOException {
     if (tsFileMetaData == null) {
       readFileMetadata();
     }
@@ -949,7 +950,7 @@ public class TsFileSequenceReader implements AutoCloseable {
           null,
           metadataIndexNode.getNodeType(),
           timeseriesMetadataMap,
-          false);
+          needChunkMetadata);
     }
     return timeseriesMetadataMap;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 49553e42d9..00536ca1dc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -22,7 +22,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -41,8 +47,15 @@ import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable {
@@ -413,7 +426,8 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
   }
 
   @Override
-  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+      throws IOException {
     if (tsFileMetaData == null) {
       readFileMetadata();
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 70a5d8cf9f..ae2afdb8e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     this(file, true);
   }
 
+  /**
+   * @param file a given tsfile path you want to (continue to) write
+   * @throws IOException if write failed, or the file is broken but autoRepair==false.
+   */
+  public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    this(file, true);
+    this.maxMetadataSize = maxMetadataSize;
+    this.enableMemoryControl = true;
+    this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+    this.checkMetadataSizeAndMayFlush();
+  }
+
   public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
     if (logger.isDebugEnabled()) {
       logger.debug("{} is opened.", file.getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f..7fdbac27df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,35 +26,47 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
 import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.TreeMap;
 
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
 /**
  * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
  */
@@ -93,6 +105,21 @@ public class TsFileIOWriter implements AutoCloseable {
   private long minPlanIndex;
   private long maxPlanIndex;
 
+  // the following variable is used for memory control
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  protected boolean enableMemoryControl = false;
+  private Path lastSerializePath = null;
+  protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+  private volatile int chunkMetadataCount = 0;
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
+
   /** empty construct function. */
   protected TsFileIOWriter() {}
 
@@ -126,6 +153,15 @@ public class TsFileIOWriter implements AutoCloseable {
     this.out = output;
   }
 
+  /** for write with memory control */
+  public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
+      throws IOException {
+    this(file);
+    this.enableMemoryControl = enableMemoryControl;
+    this.maxMetadataSize = maxMetadataSize;
+    chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
   /**
    * Writes given bytes to output stream. This method is called when total memory size exceeds the
    * chunk group size threshold.
@@ -236,6 +272,10 @@ public class TsFileIOWriter implements AutoCloseable {
 
   /** end chunk and write some log. */
   public void endCurrentChunk() {
+    if (enableMemoryControl) {
+      this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    }
+    chunkMetadataCount++;
     chunkMetadataList.add(currentChunkMetadata);
     currentChunkMetadata = null;
   }
@@ -247,47 +287,14 @@ public class TsFileIOWriter implements AutoCloseable {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void endFile() throws IOException {
-    long metaOffset = out.getPosition();
-
-    // serialize the SEPARATOR of MetaData
-    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
-    // group ChunkMetadata by series
-    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
-    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
-      List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
-      for (IChunkMetadata chunkMetadata : chunkMetadatas) {
-        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
-        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
-      }
-    }
-
-    MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
-    TsFileMetadata tsFileMetaData = new TsFileMetadata();
-    tsFileMetaData.setMetadataIndex(metadataIndex);
-    tsFileMetaData.setMetaOffset(metaOffset);
+    checkInMemoryPathCount();
+    readChunkMetadataAndConstructIndexTree();
 
     long footerIndex = out.getPosition();
     if (logger.isDebugEnabled()) {
       logger.debug("start to flush the footer,file pos:{}", footerIndex);
     }
 
-    // write TsFileMetaData
-    int size = tsFileMetaData.serializeTo(out.wrapAsStream());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
-    }
-
-    // write bloom filter
-    size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
-    }
-
-    // write TsFileMetaData size
-    ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
-
     // write magic string
     out.write(MAGIC_STRING_BYTES);
 
@@ -296,66 +303,121 @@ public class TsFileIOWriter implements AutoCloseable {
     if (resourceLogger.isDebugEnabled() && file != null) {
       resourceLogger.debug("{} writer is closed.", file.getName());
     }
+    if (file != null) {
+      File chunkMetadataFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+      if (chunkMetadataFile.exists()) {
+        FileUtils.delete(chunkMetadataFile);
+      }
+    }
     canWrite = false;
   }
 
-  /**
-   * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
-   *
-   * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
-   * @return MetadataIndexEntry list in TsFileMetadata
-   */
-  private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
-      throws IOException {
-
-    // convert ChunkMetadataList to this field
-    deviceTimeseriesMetadataMap = new LinkedHashMap<>();
-    // create device -> TimeseriesMetaDataList Map
-    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
-      // for ordinary path
-      flushOneChunkMetadata(entry.getKey(), entry.getValue());
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    if (tempOutput != null) {
+      tempOutput.close();
     }
+    long metaOffset = out.getPosition();
 
-    // construct TsFileMetadata and return
-    return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
-  }
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
-  /**
-   * Flush one chunkMetadata
-   *
-   * @param path Path of chunk
-   * @param chunkMetadataList List of chunkMetadata about path(previous param)
-   */
-  private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
-      throws IOException {
-    // create TimeseriesMetaData
-    PublicBAOS publicBAOS = new PublicBAOS();
-    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
-    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
-    int chunkMetadataListLength = 0;
-    boolean serializeStatistic = (chunkMetadataList.size() > 1);
-    // flush chunkMetadataList one by one
-    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-      if (!chunkMetadata.getDataType().equals(dataType)) {
-        continue;
+    TSMIterator tsmIterator =
+        hasChunkMetadataInDisk
+            ? TSMIterator.getTSMIteratorInDisk(
+                chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
+            : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+    Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+    String currentDevice = null;
+    String prevDevice = null;
+    MetadataIndexNode currentIndexNode =
+        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    int seriesIdxForCurrDevice = 0;
+    BloomFilter filter =
+        BloomFilter.getEmptyBloomFilter(
+            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+    int indexCount = 0;
+    while (tsmIterator.hasNext()) {
+      // read in all chunk metadata of one series
+      // construct the timeseries metadata for this series
+      Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
+      TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+      currentSeries = timeseriesMetadataPair.left;
+
+      indexCount++;
+      // build bloom filter
+      filter.add(currentSeries);
+      // construct the index tree node for the series
+      Path currentPath = null;
+      if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+        // this series is the time column of the aligned device
+        // the full series path will be like "root.sg.d."
+        // we remove the last . in the series id here
+        currentPath = new Path(currentSeries);
+        currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+      } else {
+        currentPath = new Path(currentSeries, true);
+        currentDevice = currentPath.getDevice();
+      }
+      if (!currentDevice.equals(prevDevice)) {
+        if (prevDevice != null) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          deviceMetadataIndexMap.put(
+              prevDevice,
+              generateRootNode(
+                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        measurementMetadataIndexQueue = new ArrayDeque<>();
+        seriesIdxForCurrDevice = 0;
+      }
+
+      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+          currentIndexNode.addEntry(
+              new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+        } else {
+          currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+        }
       }
-      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
-      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+
+      prevDevice = currentDevice;
+      seriesIdxForCurrDevice++;
+      // serialize the timeseries metadata to file
+      timeseriesMetadata.serializeTo(out.wrapAsStream());
+    }
+
+    addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+    if (prevDevice != null) {
+      deviceMetadataIndexMap.put(
+          prevDevice,
+          generateRootNode(
+              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
     }
 
-    TimeseriesMetadata timeseriesMetadata =
-        new TimeseriesMetadata(
-            (byte)
-                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
-            chunkMetadataListLength,
-            path.getMeasurement(),
-            dataType,
-            seriesStatistics,
-            publicBAOS);
-    deviceTimeseriesMetadataMap
-        .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
-        .add(timeseriesMetadata);
+    MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+    TsFileMetadata tsFileMetadata = new TsFileMetadata();
+    tsFileMetadata.setMetadataIndex(metadataIndex);
+    tsFileMetadata.setMetaOffset(metaOffset);
+
+    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());
+  }
+
+  private void checkInMemoryPathCount() {
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      pathCount += chunkGroupMetadata.getChunkMetadataList().size();
+    }
   }
 
   /**
@@ -399,6 +461,9 @@ public class TsFileIOWriter implements AutoCloseable {
   public void close() throws IOException {
     canWrite = false;
     out.close();
+    if (tempOutput != null) {
+      this.tempOutput.close();
+    }
   }
 
   void writeSeparatorMaskForTest() throws IOException {
@@ -477,6 +542,30 @@ public class TsFileIOWriter implements AutoCloseable {
    * @return DeviceTimeseriesMetadataMap
    */
   public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
+    Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
+    Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
+            .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    for (String device : chunkMetadataMap.keySet()) {
+      Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
+      for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
+        try {
+          deviceTimeseriesMetadataMap
+              .computeIfAbsent(device, x -> new ArrayList<>())
+              .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
+        } catch (IOException e) {
+          logger.error("Failed to get device timeseries metadata map", e);
+          return null;
+        }
+      }
+    }
+
     return deviceTimeseriesMetadataMap;
   }
 
@@ -495,4 +584,98 @@ public class TsFileIOWriter implements AutoCloseable {
   public void setMaxPlanIndex(long maxPlanIndex) {
     this.maxPlanIndex = maxPlanIndex;
   }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
+   * in row, you should make sure all data of current writing device has been written before this
+   * method is called. For writing not aligned series or writing aligned series in column, you
+   * should make sure that all data of one series is written before you call this function.</b>
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Flushing chunk metadata, total size is {}, count is {}, avg size is {}",
+              currentChunkMetadataSize,
+              chunkMetadataCount,
+              currentChunkMetadataSize / chunkMetadataCount);
+        }
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+        TSMIterator.sortChunkMetadata(
+            chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
+      Path seriesPath = pair.left;
+      boolean isNewPath = !seriesPath.equals(lastSerializePath);
+      if (isNewPath) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = pair.right;
+      writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+      lastSerializePath = seriesPath;
+      logger.debug("Flushing {}", seriesPath);
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+    chunkMetadataCount = 0;
+    currentChunkMetadataSize = 0;
+  }
+
+  private void writeChunkMetadataToTempFile(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
+      throws IOException {
+    // [DeviceId] measurementId datatype size chunkMetadataBuffer
+    if (lastSerializePath == null
+        || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
+      // mark the end position of last device
+      endPosInCMTForDevice.add(tempOutput.getPosition());
+      // serialize the device
+      // for each device, we only serialize it once, in order to save io
+      ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
+    }
+    if (isNewPath && iChunkMetadataList.size() > 0) {
+      // serialize the public info of this measurement
+      ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
+      ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
+    }
+    PublicBAOS buffer = new PublicBAOS();
+    int totalSize = 0;
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      totalSize += chunkMetadata.serializeTo(buffer, true);
+    }
+    ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+    buffer.writeTo(tempOutput);
+  }
+
+  public String getCurrentChunkGroupDeviceId() {
+    return currentChunkGroupDeviceId;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
new file mode 100644
index 0000000000..fd02f1438a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of
+ * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk
+ * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory.
+ */
+public class DiskTSMIterator extends TSMIterator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class);
+
+  private LinkedList<Long> endPosForEachDevice;
+  private File cmtFile;
+  private LocalTsFileInput input;
+  private long fileLength = 0;
+  private long currentPos = 0;
+  private long nextEndPosForDevice = 0;
+  private String currentDevice;
+  private boolean remainsInFile = true;
+
+  protected DiskTSMIterator(
+      File cmtFile,
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      LinkedList<Long> endPosForEachDevice)
+      throws IOException {
+    super(chunkGroupMetadataList);
+    this.cmtFile = cmtFile;
+    this.endPosForEachDevice = endPosForEachDevice;
+    this.input = new LocalTsFileInput(cmtFile.toPath());
+    this.fileLength = cmtFile.length();
+    this.nextEndPosForDevice = endPosForEachDevice.removeFirst();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return remainsInFile || iterator.hasNext();
+  }
+
+  @Override
+  public Pair<String, TimeseriesMetadata> next() {
+    try {
+      if (remainsInFile) {
+        // deserialize from file
+        return getTimeSerisMetadataFromFile();
+      } else {
+        // get from memory iterator
+        return super.next();
+      }
+    } catch (IOException e) {
+      LOG.error("Meets IOException when reading timeseries metadata from disk", e);
+      return null;
+    }
+  }
+
+  private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException {
+    if (currentPos == nextEndPosForDevice) {
+      // deserialize the current device name
+      currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream());
+      nextEndPosForDevice =
+          endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength;
+    }
+    // deserialize public info for measurement
+    String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream());
+    byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream());
+    TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte);
+    int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream());
+    ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize);
+    int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+    if (readSize < chunkBufferSize) {
+      throw new IOException(
+          String.format(
+              "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize));
+    }
+    chunkBuffer.flip();
+
+    // deserialize chunk metadata from chunk buffer
+    List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+    while (chunkBuffer.hasRemaining()) {
+      chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType));
+    }
+    updateCurrentPos();
+    return new Pair<>(
+        currentDevice + "." + measurementUid,
+        constructOneTimeseriesMetadata(measurementUid, chunkMetadataList));
+  }
+
+  private void updateCurrentPos() throws IOException {
+    currentPos = input.position();
+    if (currentPos >= fileLength) {
+      remainsInFile = false;
+      input.close();
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
new file mode 100644
index 0000000000..f11242f296
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data
+ * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads
+ * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk.
+ */
+public class TSMIterator {
+  private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class);
+  protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList;
+  protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator;
+
+  protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null);
+    this.iterator = sortedChunkMetadataList.iterator();
+  }
+
+  public static TSMIterator getTSMIteratorInMemory(
+      List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    return new TSMIterator(chunkGroupMetadataList);
+  }
+
+  public static TSMIterator getTSMIteratorInDisk(
+      File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos)
+      throws IOException {
+    return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos);
+  }
+
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  public Pair<String, TimeseriesMetadata> next() throws IOException {
+    Pair<Path, List<IChunkMetadata>> nextPair = iterator.next();
+    return new Pair<>(
+        nextPair.left.getFullPath(),
+        constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right));
+  }
+
+  public static TimeseriesMetadata constructOneTimeseriesMetadata(
+      String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException {
+    // create TimeseriesMetaData
+    PublicBAOS publicBAOS = new PublicBAOS();
+    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+    int chunkMetadataListLength = 0;
+    boolean serializeStatistic = (chunkMetadataList.size() > 1);
+    // flush chunkMetadataList one by one
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      if (!chunkMetadata.getDataType().equals(dataType)) {
+        continue;
+      }
+      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+    }
+
+    TimeseriesMetadata timeseriesMetadata =
+        new TimeseriesMetadata(
+            (byte)
+                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+            chunkMetadataListLength,
+            measurementId,
+            dataType,
+            seriesStatistics,
+            publicBAOS);
+    return timeseriesMetadata;
+  }
+
+  public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      String currentDevice,
+      List<ChunkMetadata> chunkMetadataList) {
+    Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>());
+      for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .get(chunkGroupMetadata.getDevice())
+            .computeIfAbsent(
+                new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()),
+                x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    if (currentDevice != null) {
+      for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+        chunkMetadataMap
+            .computeIfAbsent(currentDevice, x -> new TreeMap<>())
+            .computeIfAbsent(
+                new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+
+    for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) {
+      Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue();
+      for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry :
+          seriesChunkMetadataMap.entrySet()) {
+        sortedChunkMetadataList.add(
+            new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue()));
+      }
+    }
+    return sortedChunkMetadataList;
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
index 5e518d4280..f7137e95be 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
@@ -234,7 +234,7 @@ public class MetadataIndexConstructorTest {
       assertFalse(iterator.hasNext());
 
       Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
-          reader.getAllTimeseriesMetadata();
+          reader.getAllTimeseriesMetadata(false);
       for (int j = 0; j < actualDevices.size(); j++) {
         for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
           assertEquals(
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 4c2321928a..ba955912f1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -147,7 +147,7 @@ public class TsFileIOWriterTest {
 
     // make sure timeseriesMetadata is only
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
-        reader.getAllTimeseriesMetadata();
+        reader.getAllTimeseriesMetadata(false);
     Set<String> pathSet = new HashSet<>();
     for (Map.Entry<String, List<TimeseriesMetadata>> entry :
         deviceTimeseriesMetadataMap.entrySet()) {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
new file mode 100644
index 0000000000..c97a9a0774
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+  private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+
+  /**
+   * This method check the integrity of file by reading it from the start to the end. It mainly
+   * checks the integrity of the chunks.
+   *
+   * @param filename
+   */
+  public static void checkIntegrityBySequenceRead(String filename) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      String headMagicString = reader.readHeadMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+      String tailMagicString = reader.readTailMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
+      byte marker;
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                TimePageReader timePageReader =
+                    new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+                timeBatch.add(timePageReader.getNextTimeBatch());
+              } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                ValuePageReader valuePageReader =
+                    new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+                TsPrimitiveType[] valueBatch =
+                    valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+              } else { // NonAligned Chunk
+                PageReader pageReader =
+                    new PageReader(
+                        pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+                BatchData batchData = pageReader.getAllSatisfiedPageData();
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            MetaMarker.handleUnexpectedMarker(marker);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+
+  /**
+   * This method checks the integrity of the file by mimicking the process of the query, which reads
+   * the metadata index tree first, and get the timeseries metadata list and chunk metadata list.
+   * After that, this method acquires single chunk according to chunk metadata, then it deserializes
+   * the chunk, and verifies the correctness of the data.
+   *
+   * @param filename File to be check
+   * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>,
+   *     each inner list stands for a chunk.
+   */
+  public static void checkIntegrityByQuery(
+      String filename,
+      Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+          reader.getAllTimeseriesMetadata(true);
+      Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+      // check each series
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) {
+        String deviceId = entry.getKey();
+        List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+        boolean vectorMode = false;
+        if (timeseriesMetadataList.size() > 0
+            && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) {
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size());
+        } else {
+          vectorMode = true;
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1);
+        }
+
+        if (!vectorMode) {
+          // check integrity of not aligned series
+          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+            // get its chunk metadata list, and read the chunk
+            String measurementId = timeseriesMetadata.getMeasurementId();
+            List<List<Pair<Long, TsPrimitiveType>>> originChunks =
+                originData.get(deviceId).get(measurementId);
+            List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+            Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+            chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+            for (int i = 0; i < chunkMetadataList.size(); ++i) {
+              Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i));
+              ChunkReader chunkReader = new ChunkReader(chunk, null);
+              List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i);
+              // deserialize the chunk and verify it with origin data
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNextTimeValuePair()) {
+                  TimeValuePair pair = pointReader.nextTimeValuePair();
+                  Assert.assertEquals(
+                      originValue.get(valIdx).left.longValue(), pair.getTimestamp());
+                  Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue());
+                }
+              }
+            }
+          }
+        } else {
+          // check integrity of vector type
+          // get the timeseries metadata of the time column
+          TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0);
+          List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList();
+          timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+
+          for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+            // traverse each value column
+            List<IChunkMetadata> valueChunkMetadataList =
+                timeseriesMetadataList.get(i).getChunkMetadataList();
+            Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size());
+            List<List<Pair<Long, TsPrimitiveType>>> originDataChunks =
+                originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId());
+            for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) {
+              Chunk timeChunk =
+                  reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx));
+              Chunk valueChunk =
+                  reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx));
+              // construct an aligned chunk reader using time chunk and value chunk
+              IChunkReader chunkReader =
+                  new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null);
+              // verify the values
+              List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx);
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNext()) {
+                  long time = pointReader.currentTime();
+                  Assert.assertEquals(originValue.get(valIdx).left.longValue(), time);
+                  Assert.assertEquals(
+                      originValue.get(valIdx++).right.getValue(),
+                      ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue());
+                  pointReader.next();
+                }
+              }
+            }
+          }
+        }
+      }
+
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
new file mode 100644
index 0000000000..b7c6ff84db
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TsFileIOWriterMemoryControlTest {
+  private static File testFile = new File("target", "1-1-0-0.tsfile");
+  private static File emptyFile = new File("target", "temp");
+  private long TEST_CHUNK_SIZE = 1000;
+  private List<String> sortedSeriesId = new ArrayList<>();
+  private List<String> sortedDeviceId = new ArrayList<>();
+  private boolean init = false;
+
+  @Before
+  public void setUp() throws IOException {
+    if (!init) {
+      init = true;
+      for (int i = 0; i < 2048; ++i) {
+        sortedSeriesId.add("s" + i);
+        sortedDeviceId.add("root.sg.d" + i);
+      }
+      sortedSeriesId.sort((String::compareTo));
+      sortedDeviceId.sort((String::compareTo));
+    }
+    TEST_CHUNK_SIZE = 1000;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (testFile.exists()) {
+      FileUtils.delete(testFile);
+    }
+    if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
+      FileUtils.delete(
+          new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
+    }
+    if (emptyFile.exists()) {
+      FileUtils.delete(emptyFile);
+    }
+  }
+
+  /** The following tests is for ChunkMetadata serialization and deserialization. */
+  @Test
+  public void testSerializeAndDeserializeChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile,
+              writer.chunkGroupMetadataList,
+              writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+        chunkWriter.writeToFileWriter(writer);
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      List<String> measurementIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        measurementIds.add(sortedDeviceId.get(i) + ".");
+        for (int j = 1; j <= 6; ++j) {
+          measurementIds.add(sortedDeviceId.get(i) + ".s" + j);
+        }
+      }
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        String fullPath = timeseriesMetadataPair.left;
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(measurementIds.get(i), fullPath);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      List<String> seriesIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        if (i % 2 == 0) {
+          // write normal series
+          for (int j = 0; j < 5; ++j) {
+            ChunkWriterImpl chunkWriter;
+            switch (j) {
+              case 0:
+                chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+                break;
+              case 1:
+                chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
+                break;
+              case 2:
+                chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+                break;
+              case 3:
+                chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+                break;
+              case 4:
+              default:
+                chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+                break;
+            }
+            chunkWriter.writeToFileWriter(writer);
+            seriesIds.add(deviceId + "." + sortedSeriesId.get(j));
+          }
+        } else {
+          // write vector
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+          chunkWriter.writeToFileWriter(writer);
+          seriesIds.add(deviceId + ".");
+          for (int l = 1; l <= 6; ++l) {
+            seriesIds.add(deviceId + ".s" + l);
+          }
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; i < originChunkMetadataList.size(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(),
+            timeseriesMetadataPair.right.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(),
+            timeseriesMetadataPair.right.getStatistics());
+      }
+    }
+  }
+
+  /** The following tests is for writing normal series in different nums. */
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, valList);
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0L, valList);
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, valList);
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, valList);
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, valList);
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+          writer.checkMetadataSizeAndMayFlush();
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+              .add(valList);
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. We maintain some chunk metadata in memory when calling endFile().
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          if (i < 9) {
+            writer.checkMetadataSizeAndMayFlush();
+          }
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 1;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 1024; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /**
+   * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1024; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /** The following tests is for writing aligned series. */
+
+  /**
+   * Test writing 10 align series, 6 in a group.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
+        for (int j = 1; j <= 6; ++j) {
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent("s" + j, x -> new ArrayList<>())
+              .add(valList.get(j - 1));
+        }
+
+        chunkWriter.writeToFileWriter(writer);
+        writer.endChunkGroup();
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test writing 1 aligned series, for each series we write 512 chunks
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 512, seriesNum = 6;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < chunkNum; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter =
+              generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+          for (int j = 1; j <= seriesNum; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test write aligned chunk metadata, for each aligned series, we write 1024 components.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 1024;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < 10; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 12;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    int deviceNum = 1024;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < deviceNum; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 1024; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingCompleteMixedFiles() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < 10; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
+          for (int j = 1; j <= 6; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      for (int i = 5; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumn() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        TimeChunkWriter timeChunkWriter =
+            new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+        for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+          timeChunkWriter.write(j);
+        }
+        timeChunkWriter.writeToFileWriter(writer);
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          ValueChunkWriter chunkWriter =
+              new ValueChunkWriter(
+                  sortedSeriesId.get(k),
+                  CompressionType.SNAPPY,
+                  TSDataType.DOUBLE,
+                  TSEncoding.PLAIN,
+                  builder.getEncoder(TSDataType.DOUBLE));
+          Random random = new Random();
+          List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+          for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+            double val = random.nextDouble();
+            chunkWriter.write(j, val, false);
+            valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+          }
+          chunkWriter.writeToFileWriter(writer);
+          originValue
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+              .add(valueList);
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  /** The following tests is for writing mixed of normal series and aligned series */
+  private ChunkWriterImpl generateIntData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      long val = random.nextLong();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateFloatData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      float val = random.nextFloat();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateDoubleData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      double val = random.nextDouble();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateBooleanData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      boolean val = random.nextBoolean();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
+    }
+    return chunkWriter;
+  }
+
+  private AlignedChunkWriterImpl generateVectorData(
+      long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.INT32,
+          TSDataType.INT64,
+          TSDataType.FLOAT,
+          TSDataType.DOUBLE,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+    for (int i = 0; i < seriesNum; ++i) {
+      measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
+    }
+    AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
+    Random random = new Random();
+    for (int i = 0; i < seriesNum; ++i) {
+      record.add(new ArrayList<>());
+    }
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
+      for (int j = 0; j < seriesNum; ++j) {
+        switch (j % 6) {
+          case 0:
+            points[j] = new TsPrimitiveType.TsInt(random.nextInt());
+            break;
+          case 1:
+            points[j] = new TsPrimitiveType.TsLong(random.nextLong());
+            break;
+          case 2:
+            points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
+            break;
+          case 3:
+            points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
+            break;
+          case 4:
+            points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
+            break;
+          case 5:
+            points[j] =
+                new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble())));
+            break;
+        }
+      }
+      for (int j = 0; j < seriesNum; ++j) {
+        record.get(j).add(new Pair<>(i, points[j]));
+      }
+      chunkWriter.write(i, points);
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateTextData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      Binary val = new Binary(String.valueOf(random.nextDouble()));
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
+    }
+    return chunkWriter;
+  }
+}