You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/04 09:45:32 UTC

[iotdb] 01/01: control metadata size in TsFileIOWriter

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4517
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b873fed67b30b0c4b1e9185b9428fb962e34e32c
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Oct 4 17:45:11 2022 +0800

    control metadata size in TsFileIOWriter
---
 .../resources/conf/iotdb-engine.properties         |    8 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   21 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   10 +
 .../db/engine/compaction/CompactionUtils.java      |   79 +-
 .../cross/rewrite/task/SubCompactionTask.java      |    5 +-
 .../utils/AlignedSeriesCompactionExecutor.java     |    1 +
 .../inner/utils/InnerSpaceCompactionUtils.java     |   12 +-
 .../utils/SingleSeriesCompactionExecutor.java      |    1 +
 .../writer/AbstractCompactionWriter.java           |    7 +
 .../writer/CrossSpaceCompactionWriter.java         |   47 +
 .../writer/InnerSpaceCompactionWriter.java         |   34 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   16 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |    9 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |    2 +-
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |    2 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |   12 +-
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |    8 +
 .../file/metadata/MetadataIndexConstructor.java    |   39 +-
 .../tsfile/file/metadata/MetadataIndexNode.java    |    2 +-
 .../iotdb/tsfile/file/metadata/TsFileMetadata.java |   13 +
 .../tsfile/file/metadata/enums/TSDataType.java     |    2 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |    5 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |   20 +-
 .../write/writer/RestorableTsFileIOWriter.java     |   12 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  359 ++++--
 .../write/writer/tsmiterator/DiskTSMIterator.java  |  132 ++
 .../write/writer/tsmiterator/TSMIterator.java      |  147 +++
 .../tsfile/write/MetadataIndexConstructorTest.java |    2 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |    2 +-
 .../tsfile/write/TsFileIntegrityCheckingTool.java  |  251 ++++
 .../writer/TsFileIOWriterMemoryControlTest.java    | 1303 ++++++++++++++++++++
 31 files changed, 2411 insertions(+), 152 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e17a900a25..9f494b582c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -353,6 +353,10 @@ timestamp_precision=ms
 # Datatype: int
 # primitive_array_size=32
 
+# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
+# Datatype: double
+# chunk_metadata_size_proportion_in_write=0.1
+
 # Ratio of write memory for invoking flush disk, 0.4 by default
 # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
 # Datatype: double
@@ -449,6 +453,10 @@ timestamp_precision=ms
 # BALANCE: alternate two compaction types
 # compaction_priority=BALANCE
 
+# size proportion for chunk metadata maintains in memory when compacting
+# Datatype: double
+# chunk_metadata_size_proportion_in_compaction=0.05
+
 # The target tsfile size in compaction
 # Datatype: long, Unit: byte
 # target_compaction_file_size=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f4cfc65920..c9151ffcf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,6 +128,8 @@ public class IoTDBConfig {
   /** The proportion of write memory for write process */
   private double writeProportion = 0.8;
 
+  private double chunkMetadataSizeProportionInWrite = 0.1;
+
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
@@ -398,6 +400,8 @@ public class IoTDBConfig {
    */
   private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
 
+  private double chunkMetadataSizeProportionInCompaction = 0.05;
+
   /** The target tsfile size in compaction, 1 GB by default */
   private long targetCompactionFileSize = 1073741824L;
 
@@ -2773,4 +2777,21 @@ public class IoTDBConfig {
   public void setCustomizedProperties(Properties customizedProperties) {
     this.customizedProperties = customizedProperties;
   }
+
+  public double getChunkMetadataSizeProportionInWrite() {
+    return chunkMetadataSizeProportionInWrite;
+  }
+
+  public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
+    this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
+  }
+
+  public double getChunkMetadataSizeProportionInCompaction() {
+    return chunkMetadataSizeProportionInCompaction;
+  }
+
+  public void setChunkMetadataSizeProportionInCompaction(
+      double chunkMetadataSizeProportionInCompaction) {
+    this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 815113f6b4..01606eb873 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -341,6 +341,11 @@ public class IoTDBDescriptor {
                 "max_waiting_time_when_insert_blocked",
                 Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
 
+    conf.setChunkMetadataSizeProportionInCompaction(
+        Double.parseDouble(
+            properties.getProperty(
+                "chunk_metadata_size_proportion_in_compaction",
+                Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
     conf.setEstimatedSeriesSize(
         Integer.parseInt(
             properties.getProperty(
@@ -928,6 +933,11 @@ public class IoTDBDescriptor {
         .setKerberosPrincipal(
             properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
     TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+    conf.setChunkMetadataSizeProportionInWrite(
+        Double.parseDouble(
+            properties.getProperty(
+                "chunk_metadata_size_proportion_in_write",
+                Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));
 
     // timed flush memtable, timed close tsfile
     loadTimedService(properties);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 97bcff78ff..e32930864e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -60,7 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -165,48 +165,51 @@ public class CompactionUtils {
       QueryContext queryContext,
       QueryDataSource queryDataSource)
       throws IOException, InterruptedException {
-    Map<String, MeasurementSchema> measurementSchemaMap =
-        deviceIterator.getAllSchemasOfCurrentDevice();
-    int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
-
-    // assign all measurements to different sub tasks
-    Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
-    int idx = 0;
-    for (String measurement : measurementSchemaMap.keySet()) {
-      if (measurementsForEachSubTask[idx % subTaskNums] == null) {
-        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
+    Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice();
+    List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
+    allMeasurements.sort((String::compareTo));
+    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+    // construct sub tasks and start compacting measurements in parallel
+    if (subTaskNums > 0) {
+      // assign the measurements for each subtask
+      List<String>[] measurementListArray = new List[subTaskNums];
+      for (int i = 0, size = allMeasurements.size(); i < size; ++i) {
+        int index = i % subTaskNums;
+        if (measurementListArray[index] == null) {
+          measurementListArray[index] = new LinkedList<>();
+        }
+        measurementListArray[index].add(allMeasurements.get(i));
       }
-      measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
-    }
 
-    // construct sub tasks and start compacting measurements in parallel
-    List<Future<Void>> futures = new ArrayList<>();
-    compactionWriter.startChunkGroup(device, false);
-    for (int i = 0; i < subTaskNums; i++) {
-      futures.add(
-          CompactionTaskManager.getInstance()
-              .submitSubTask(
-                  new SubCompactionTask(
-                      device,
-                      measurementsForEachSubTask[i],
-                      queryContext,
-                      queryDataSource,
-                      compactionWriter,
-                      measurementSchemaMap,
-                      i)));
-    }
+      // construct sub tasks and start compacting measurements in parallel
+      List<Future<Void>> futures = new ArrayList<>();
+      compactionWriter.startChunkGroup(device, false);
+      for (int i = 0; i < subTaskNums; i++) {
+        futures.add(
+            CompactionTaskManager.getInstance()
+                .submitSubTask(
+                    new SubCompactionTask(
+                        device,
+                        measurementListArray[i],
+                        queryContext,
+                        queryDataSource,
+                        compactionWriter,
+                        schemaMap,
+                        i)));
+      }
 
-    // wait for all sub tasks finish
-    for (int i = 0; i < subTaskNums; i++) {
-      try {
-        futures.get(i).get();
-      } catch (InterruptedException | ExecutionException e) {
-        logger.error("SubCompactionTask meet errors ", e);
-        Thread.interrupted();
-        throw new InterruptedException();
+      // wait for all sub tasks finish
+      for (int i = 0; i < subTaskNums; i++) {
+        try {
+          futures.get(i).get();
+        } catch (InterruptedException | ExecutionException e) {
+          logger.error("SubCompactionTask meet errors ", e);
+          Thread.interrupted();
+          throw new InterruptedException();
+        }
       }
     }
-
+    compactionWriter.checkAndMayFlushChunkMetadata();
     compactionWriter.endChunkGroup();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index c688deb8bd..f5d5278437 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 
 /**
@@ -45,7 +44,7 @@ public class SubCompactionTask implements Callable<Void> {
   private static final Logger logger =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private final String device;
-  private final Set<String> measurementList;
+  private final List<String> measurementList;
 
   private final QueryContext queryContext;
   private final QueryDataSource queryDataSource;
@@ -57,7 +56,7 @@ public class SubCompactionTask implements Callable<Void> {
 
   public SubCompactionTask(
       String device,
-      Set<String> measurementList,
+      List<String> measurementList,
       QueryContext queryContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index af5353153f..dbd5b98bec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -135,6 +135,7 @@ public class AlignedSeriesCompactionExecutor {
           chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
     }
+    writer.checkMetadataSizeAndMayFlush();
   }
 
   private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 06004c9c6e..a6248e9835 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -64,8 +65,17 @@ public class InnerSpaceCompactionUtils {
   public static void compact(TsFileResource targetResource, List<TsFileResource> tsFileResources)
       throws IOException, MetadataException, InterruptedException {
 
+    // size for file writer is 5% of per compaction task memory budget
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
     try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(tsFileResources);
-        TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
+        TsFileIOWriter writer =
+            new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
       while (deviceIterator.hasNextDevice()) {
         Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
         String device = deviceInfo.left;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..d614b3dbe3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -138,6 +138,7 @@ public class SingleSeriesCompactionExecutor {
     }
     targetResource.updateStartTime(device, minStartTimestamp);
     targetResource.updateEndTime(device, maxEndTimestamp);
+    fileWriter.checkMetadataSizeAndMayFlush();
   }
 
   private void constructChunkWriterFromReadChunk(Chunk chunk) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 5c1460230d..72096069e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -177,4 +177,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   }
 
   public abstract List<TsFileIOWriter> getFileIOWriter();
+
+  public void checkAndMayFlushChunkMetadata() throws IOException {
+    List<TsFileIOWriter> writers = this.getFileIOWriter();
+    for (TsFileIOWriter writer : writers) {
+      writer.checkMetadataSizeAndMayFlush();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 3e245cfc35..3a413d4cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -27,6 +29,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // target fileIOWriters
@@ -34,6 +38,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
 
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
+  private List<TsFileResource> targetTsFileResources;
 
   // Each sub task has its corresponding seq file index.
   // The index of the array corresponds to subTaskId.
@@ -51,17 +56,46 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // current chunk group header size
   private int chunkGroupHeaderSize;
 
+  private AtomicLong[] startTimeForCurDeviceForEachFile;
+  private AtomicLong[] endTimeForCurDeviceForEachFile;
+  private AtomicBoolean[] hasCurDeviceForEachFile;
+  private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][];
+  private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][];
+
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
       throws IOException {
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
     isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    this.targetTsFileResources = targetResources;
+    startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+    hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()];
+    long memorySizeForEachWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction()
+                / targetResources.size());
     for (int i = 0; i < targetResources.size(); i++) {
       this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
       isEmptyFile[i] = true;
+      startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE);
+      hasCurDeviceForEachFile[i] = new AtomicBoolean(false);
     }
     this.seqTsFileResources = seqFileResources;
+    for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) {
+      startTimeForEachDevice[i] = new AtomicLong[size];
+      endTimeForEachDevice[i] = new AtomicLong[size];
+      for (int j = 0; j < size; ++j) {
+        startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE);
+        endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE);
+      }
+    }
   }
 
   @Override
@@ -86,6 +120,16 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
       }
       isDeviceExistedInTargetFiles[i] = false;
     }
+    for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) {
+      for (int j = 0; j < subTaskNum; ++j) {
+        targetTsFileResources
+            .get(i)
+            .updateStartTime(deviceId, startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE));
+        targetTsFileResources
+            .get(i)
+            .updateEndTime(deviceId, endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE));
+      }
+    }
     deviceId = null;
   }
 
@@ -99,6 +143,9 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   public void write(long timestamp, Object value, int subTaskId) throws IOException {
     checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
     writeDataPoint(timestamp, value, subTaskId);
+    int fileIndex = seqFileIndexArray[subTaskId];
+    startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::min);
+    endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::max);
     checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
     isEmptyFile[seqFileIndexArray[subTaskId]] = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index af2cc53c67..18fa51d7a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -18,21 +18,43 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   private TsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
+  private TsFileResource resource;
+  private AtomicLong[] startTimeOfCurDevice;
+  private AtomicLong[] endTimeOfCurDevice;
 
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
-    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
+    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter);
     isEmptyFile = true;
+    resource = targetFileResource;
+    int concurrentThreadNum =
+        Math.max(1, IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+    startTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    endTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+    for (int i = 0; i < concurrentThreadNum; ++i) {
+      startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE);
+      endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE);
+    }
   }
 
   @Override
@@ -44,6 +66,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
 
   @Override
   public void endChunkGroup() throws IOException {
+    for (int i = 0; i < startTimeOfCurDevice.length; ++i) {
+      resource.updateStartTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), startTimeOfCurDevice[i].get());
+      resource.updateEndTime(
+          fileWriter.getCurrentChunkGroupDeviceId(), endTimeOfCurDevice[i].get());
+      startTimeOfCurDevice[i].set(Long.MAX_VALUE);
+      endTimeOfCurDevice[i].set(Long.MIN_VALUE);
+    }
     fileWriter.endChunkGroup();
   }
 
@@ -57,6 +87,8 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
     writeDataPoint(timestamp, value, subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
     isEmptyFile = false;
+    startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(), timestamp));
+    endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(), timestamp));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 2fd5f4db64..5ab620f7fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -39,6 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -114,12 +117,14 @@ public class MemTableFlushTask {
     long start = System.currentTimeMillis();
     long sortTime = 0;
 
-    // for map do not use get(key) to iterate
-    for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry :
-        memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID()));
+    Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+    List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+    // sort the IDeviceID in lexicographical order
+    deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+    for (IDeviceID deviceID : deviceIDList) {
+      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID()));
 
-      final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
+      final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
@@ -275,6 +280,7 @@ public class MemTableFlushTask {
               this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
               this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
               this.writer.endChunkGroup();
+              writer.checkMetadataSizeAndMayFlush();
             } else {
               ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1cb31057e5..a56061935a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -172,7 +172,14 @@ public class TsFileProcessor {
     this.storageGroupName = storageGroupName;
     this.tsFileResource = new TsFileResource(tsfile, this);
     this.storageGroupInfo = storageGroupInfo;
-    this.writer = new RestorableTsFileIOWriter(tsfile);
+    this.writer =
+        new RestorableTsFileIOWriter(
+            tsfile,
+            (long)
+                (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                    * IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getChunkMetadataSizeProportionInWrite()));
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
     logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 8c26ec903c..6ec8e69fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1423,7 +1423,7 @@ public class PlanExecutor implements IPlanExecutor {
   private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
       throws MetadataException, QueryProcessException, IOException {
     Map<String, List<TimeseriesMetadata>> metadataSet =
-        tsFileSequenceReader.getAllTimeseriesMetadata();
+        tsFileSequenceReader.getAllTimeseriesMetadata(false);
     for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
       String deviceId = entry.getKey();
       PartialPath devicePath = new PartialPath(deviceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 5d61e6c304..6273add29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -69,7 +69,7 @@ public class FileLoaderUtils {
   public static void updateTsFileResource(
       TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException {
     for (Entry<String, List<TimeseriesMetadata>> entry :
-        reader.getAllTimeseriesMetadata().entrySet()) {
+        reader.getAllTimeseriesMetadata(false).entrySet()) {
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
         tsFileResource.updateStartTime(
             entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b7722b67dd..dd87894e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.writelog.recover;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -113,7 +114,14 @@ public class TsFileRecoverPerformer {
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
     try {
-      restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+      restorableTsFileIOWriter =
+          new RestorableTsFileIOWriter(
+              file,
+              (long)
+                  (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                      * IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getChunkMetadataSizeProportionInWrite()));
     } catch (NotCompatibleTsFileException e) {
       boolean result = file.delete();
       logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
@@ -180,7 +188,7 @@ public class TsFileRecoverPerformer {
     try (TsFileSequenceReader reader =
         new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) {
       for (Entry<String, List<TimeseriesMetadata>> entry :
-          reader.getAllTimeseriesMetadata().entrySet()) {
+          reader.getAllTimeseriesMetadata(false).entrySet()) {
         for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
           tsFileResource.updateStartTime(
               entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..9ee1f7f566 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata {
     return chunkMetaData;
   }
 
+  public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+    ChunkMetadata chunkMetadata = new ChunkMetadata();
+    chunkMetadata.tsDataType = dataType;
+    chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+    chunkMetadata.statistics = Statistics.deserialize(buffer, dataType);
+    return chunkMetadata;
+  }
+
   @Override
   public long getVersion() {
     return version;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..baa93ce9da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -123,7 +123,7 @@ public class MetadataIndexConstructor {
    * @param out tsfile output
    * @param type MetadataIndexNode type
    */
-  private static MetadataIndexNode generateRootNode(
+  public static MetadataIndexNode generateRootNode(
       Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
       throws IOException {
     int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +148,7 @@ public class MetadataIndexConstructor {
     return metadataIndexNodeQueue.poll();
   }
 
-  private static void addCurrentIndexNodeToQueue(
+  public static void addCurrentIndexNodeToQueue(
       MetadataIndexNode currentIndexNode,
       Queue<MetadataIndexNode> metadataIndexNodeQueue,
       TsFileOutput out)
@@ -156,4 +156,39 @@ public class MetadataIndexConstructor {
     currentIndexNode.setEndOffset(out.getPosition());
     metadataIndexNodeQueue.add(currentIndexNode);
   }
+
+  public static MetadataIndexNode checkAndBuildLevelIndex(
+      Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException {
+    // if not exceed the max child nodes num, ignore the device index and directly point to the
+    // measurement
+    if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
+      MetadataIndexNode metadataIndexNode =
+          new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+        metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+        entry.getValue().serializeTo(out.wrapAsStream());
+      }
+      metadataIndexNode.setEndOffset(out.getPosition());
+      return metadataIndexNode;
+    }
+
+    // else, build level index for devices
+    Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
+    MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+
+    for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+      // when constructing from internal node, each node is related to an entry
+      if (currentIndexNode.isFull()) {
+        addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+        currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+      }
+      currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+      entry.getValue().serializeTo(out.wrapAsStream());
+    }
+    addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+    MetadataIndexNode deviceMetadataIndexNode =
+        generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
+    deviceMetadataIndexNode.setEndOffset(out.getPosition());
+    return deviceMetadataIndexNode;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
     this.children.add(metadataIndexEntry);
   }
 
-  boolean isFull() {
+  public boolean isFull() {
     return children.size() >= config.getMaxDegreeOfIndexNode();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..7b70f54ebe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -119,6 +119,19 @@ public class TsFileMetadata {
     return byteLen;
   }
 
+  public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+      throws IOException {
+    int byteLen = 0;
+    byte[] bytes = filter.serialize();
+    byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
+    outputStream.write(bytes);
+    byteLen += bytes.length;
+    byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getSize(), outputStream);
+    byteLen +=
+        ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getHashFunctionSize(), outputStream);
+    return byteLen;
+  }
+
   /**
    * build bloom filter
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 48d9b2a38c..73ae05703c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -62,7 +62,7 @@ public enum TSDataType {
     return getTsDataType(type);
   }
 
-  private static TSDataType getTsDataType(byte type) {
+  public static TSDataType getTsDataType(byte type) {
     switch (type) {
       case 0:
         return TSDataType.BOOLEAN;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 90ec11349f..3a413b2f98 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -929,7 +929,8 @@ public class TsFileSequenceReader implements AutoCloseable {
   }
 
   /* TimeseriesMetadata don't need deserialize chunk metadata list */
-  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+      throws IOException {
     if (tsFileMetaData == null) {
       readFileMetadata();
     }
@@ -949,7 +950,7 @@ public class TsFileSequenceReader implements AutoCloseable {
           null,
           metadataIndexNode.getNodeType(),
           timeseriesMetadataMap,
-          false);
+          needChunkMetadata);
     }
     return timeseriesMetadataMap;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 49553e42d9..00536ca1dc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -22,7 +22,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -41,8 +47,15 @@ import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable {
@@ -413,7 +426,8 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
   }
 
   @Override
-  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+      throws IOException {
     if (tsFileMetaData == null) {
       readFileMetadata();
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 70a5d8cf9f..ae2afdb8e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     this(file, true);
   }
 
+  /**
+   * @param file a given tsfile path you want to (continue to) write
+   * @throws IOException if write failed, or the file is broken but autoRepair==false.
+   */
+  public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    this(file, true);
+    this.maxMetadataSize = maxMetadataSize;
+    this.enableMemoryControl = true;
+    this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+    this.checkMetadataSizeAndMayFlush();
+  }
+
   public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
     if (logger.isDebugEnabled()) {
       logger.debug("{} is opened.", file.getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f..7fdbac27df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,35 +26,47 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
 import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.TreeMap;
 
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
 /**
  * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
  */
@@ -93,6 +105,21 @@ public class TsFileIOWriter implements AutoCloseable {
   private long minPlanIndex;
   private long maxPlanIndex;
 
+  // the following variable is used for memory control
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  protected boolean enableMemoryControl = false;
+  private Path lastSerializePath = null;
+  protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+  private volatile int chunkMetadataCount = 0;
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
+
   /** empty construct function. */
   protected TsFileIOWriter() {}
 
@@ -126,6 +153,15 @@ public class TsFileIOWriter implements AutoCloseable {
     this.out = output;
   }
 
+  /** for write with memory control */
+  public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
+      throws IOException {
+    this(file);
+    this.enableMemoryControl = enableMemoryControl;
+    this.maxMetadataSize = maxMetadataSize;
+    chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
   /**
    * Writes given bytes to output stream. This method is called when total memory size exceeds the
    * chunk group size threshold.
@@ -236,6 +272,10 @@ public class TsFileIOWriter implements AutoCloseable {
 
   /** end chunk and write some log. */
   public void endCurrentChunk() {
+    if (enableMemoryControl) {
+      this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    }
+    chunkMetadataCount++;
     chunkMetadataList.add(currentChunkMetadata);
     currentChunkMetadata = null;
   }
@@ -247,47 +287,14 @@ public class TsFileIOWriter implements AutoCloseable {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void endFile() throws IOException {
-    long metaOffset = out.getPosition();
-
-    // serialize the SEPARATOR of MetaData
-    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
-    // group ChunkMetadata by series
-    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
-    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
-      List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
-      for (IChunkMetadata chunkMetadata : chunkMetadatas) {
-        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
-        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
-      }
-    }
-
-    MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
-    TsFileMetadata tsFileMetaData = new TsFileMetadata();
-    tsFileMetaData.setMetadataIndex(metadataIndex);
-    tsFileMetaData.setMetaOffset(metaOffset);
+    checkInMemoryPathCount();
+    readChunkMetadataAndConstructIndexTree();
 
     long footerIndex = out.getPosition();
     if (logger.isDebugEnabled()) {
       logger.debug("start to flush the footer,file pos:{}", footerIndex);
     }
 
-    // write TsFileMetaData
-    int size = tsFileMetaData.serializeTo(out.wrapAsStream());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
-    }
-
-    // write bloom filter
-    size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
-    }
-
-    // write TsFileMetaData size
-    ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
-
     // write magic string
     out.write(MAGIC_STRING_BYTES);
 
@@ -296,66 +303,121 @@ public class TsFileIOWriter implements AutoCloseable {
     if (resourceLogger.isDebugEnabled() && file != null) {
       resourceLogger.debug("{} writer is closed.", file.getName());
     }
+    if (file != null) {
+      File chunkMetadataFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+      if (chunkMetadataFile.exists()) {
+        FileUtils.delete(chunkMetadataFile);
+      }
+    }
     canWrite = false;
   }
 
-  /**
-   * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
-   *
-   * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
-   * @return MetadataIndexEntry list in TsFileMetadata
-   */
-  private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
-      throws IOException {
-
-    // convert ChunkMetadataList to this field
-    deviceTimeseriesMetadataMap = new LinkedHashMap<>();
-    // create device -> TimeseriesMetaDataList Map
-    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
-      // for ordinary path
-      flushOneChunkMetadata(entry.getKey(), entry.getValue());
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    if (tempOutput != null) {
+      tempOutput.close();
     }
+    long metaOffset = out.getPosition();
 
-    // construct TsFileMetadata and return
-    return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
-  }
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
-  /**
-   * Flush one chunkMetadata
-   *
-   * @param path Path of chunk
-   * @param chunkMetadataList List of chunkMetadata about path(previous param)
-   */
-  private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
-      throws IOException {
-    // create TimeseriesMetaData
-    PublicBAOS publicBAOS = new PublicBAOS();
-    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
-    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
-    int chunkMetadataListLength = 0;
-    boolean serializeStatistic = (chunkMetadataList.size() > 1);
-    // flush chunkMetadataList one by one
-    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-      if (!chunkMetadata.getDataType().equals(dataType)) {
-        continue;
+    TSMIterator tsmIterator =
+        hasChunkMetadataInDisk
+            ? TSMIterator.getTSMIteratorInDisk(
+                chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
+            : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+    Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+    String currentDevice = null;
+    String prevDevice = null;
+    MetadataIndexNode currentIndexNode =
+        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    int seriesIdxForCurrDevice = 0;
+    BloomFilter filter =
+        BloomFilter.getEmptyBloomFilter(
+            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+    int indexCount = 0;
+    while (tsmIterator.hasNext()) {
+      // read in all chunk metadata of one series
+      // construct the timeseries metadata for this series
+      Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
+      TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+      currentSeries = timeseriesMetadataPair.left;
+
+      indexCount++;
+      // build bloom filter
+      filter.add(currentSeries);
+      // construct the index tree node for the series
+      Path currentPath = null;
+      if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+        // this series is the time column of the aligned device
+        // the full series path will be like "root.sg.d."
+        // we remove the last . in the series id here
+        currentPath = new Path(currentSeries);
+        currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+      } else {
+        currentPath = new Path(currentSeries, true);
+        currentDevice = currentPath.getDevice();
+      }
+      if (!currentDevice.equals(prevDevice)) {
+        if (prevDevice != null) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          deviceMetadataIndexMap.put(
+              prevDevice,
+              generateRootNode(
+                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        measurementMetadataIndexQueue = new ArrayDeque<>();
+        seriesIdxForCurrDevice = 0;
+      }
+
+      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+          currentIndexNode.addEntry(
+              new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+        } else {
+          currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+        }
       }
-      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
-      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+
+      prevDevice = currentDevice;
+      seriesIdxForCurrDevice++;
+      // serialize the timeseries metadata to file
+      timeseriesMetadata.serializeTo(out.wrapAsStream());
+    }
+
+    addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+    if (prevDevice != null) {
+      deviceMetadataIndexMap.put(
+          prevDevice,
+          generateRootNode(
+              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
     }
 
-    TimeseriesMetadata timeseriesMetadata =
-        new TimeseriesMetadata(
-            (byte)
-                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
-            chunkMetadataListLength,
-            path.getMeasurement(),
-            dataType,
-            seriesStatistics,
-            publicBAOS);
-    deviceTimeseriesMetadataMap
-        .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
-        .add(timeseriesMetadata);
+    MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+    TsFileMetadata tsFileMetadata = new TsFileMetadata();
+    tsFileMetadata.setMetadataIndex(metadataIndex);
+    tsFileMetadata.setMetaOffset(metaOffset);
+
+    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());
+  }
+
+  private void checkInMemoryPathCount() {
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      pathCount += chunkGroupMetadata.getChunkMetadataList().size();
+    }
   }
 
   /**
@@ -399,6 +461,9 @@ public class TsFileIOWriter implements AutoCloseable {
   public void close() throws IOException {
     canWrite = false;
     out.close();
+    if (tempOutput != null) {
+      this.tempOutput.close();
+    }
   }
 
   void writeSeparatorMaskForTest() throws IOException {
@@ -477,6 +542,30 @@ public class TsFileIOWriter implements AutoCloseable {
    * @return DeviceTimeseriesMetadataMap
    */
   public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
+    Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
+    Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
+            .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    for (String device : chunkMetadataMap.keySet()) {
+      Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
+      for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
+        try {
+          deviceTimeseriesMetadataMap
+              .computeIfAbsent(device, x -> new ArrayList<>())
+              .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
+        } catch (IOException e) {
+          logger.error("Failed to get device timeseries metadata map", e);
+          return null;
+        }
+      }
+    }
+
     return deviceTimeseriesMetadataMap;
   }
 
@@ -495,4 +584,98 @@ public class TsFileIOWriter implements AutoCloseable {
   public void setMaxPlanIndex(long maxPlanIndex) {
     this.maxPlanIndex = maxPlanIndex;
   }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
+   * in row, you should make sure all data of current writing device has been written before this
+   * method is called. For writing not aligned series or writing aligned series in column, you
+   * should make sure that all data of one series is written before you call this function.</b>
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Flushing chunk metadata, total size is {}, count is {}, avg size is {}",
+              currentChunkMetadataSize,
+              chunkMetadataCount,
+              currentChunkMetadataSize / chunkMetadataCount);
+        }
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+        TSMIterator.sortChunkMetadata(
+            chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
+      Path seriesPath = pair.left;
+      boolean isNewPath = !seriesPath.equals(lastSerializePath);
+      if (isNewPath) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = pair.right;
+      writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+      lastSerializePath = seriesPath;
+      logger.debug("Flushing {}", seriesPath);
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+    chunkMetadataCount = 0;
+    currentChunkMetadataSize = 0;
+  }
+
+  private void writeChunkMetadataToTempFile(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
+      throws IOException {
+    // [DeviceId] measurementId datatype size chunkMetadataBuffer
+    if (lastSerializePath == null
+        || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
+      // mark the end position of last device
+      endPosInCMTForDevice.add(tempOutput.getPosition());
+      // serialize the device
+      // for each device, we only serialize it once, in order to save io
+      ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
+    }
+    if (isNewPath && iChunkMetadataList.size() > 0) {
+      // serialize the public info of this measurement
+      ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
+      ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
+    }
+    PublicBAOS buffer = new PublicBAOS();
+    int totalSize = 0;
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      totalSize += chunkMetadata.serializeTo(buffer, true);
+    }
+    ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+    buffer.writeTo(tempOutput);
+  }
+
+  public String getCurrentChunkGroupDeviceId() {
+    return currentChunkGroupDeviceId;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
new file mode 100644
index 0000000000..fd02f1438a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of
+ * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk
+ * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory.
+ */
+public class DiskTSMIterator extends TSMIterator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class);
+
+  private LinkedList<Long> endPosForEachDevice;
+  private File cmtFile;
+  private LocalTsFileInput input;
+  private long fileLength = 0;
+  private long currentPos = 0;
+  private long nextEndPosForDevice = 0;
+  private String currentDevice;
+  private boolean remainsInFile = true;
+
+  protected DiskTSMIterator(
+      File cmtFile,
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      LinkedList<Long> endPosForEachDevice)
+      throws IOException {
+    super(chunkGroupMetadataList);
+    this.cmtFile = cmtFile;
+    this.endPosForEachDevice = endPosForEachDevice;
+    this.input = new LocalTsFileInput(cmtFile.toPath());
+    this.fileLength = cmtFile.length();
+    this.nextEndPosForDevice = endPosForEachDevice.removeFirst();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return remainsInFile || iterator.hasNext();
+  }
+
+  @Override
+  public Pair<String, TimeseriesMetadata> next() {
+    try {
+      if (remainsInFile) {
+        // deserialize from file
+        return getTimeSerisMetadataFromFile();
+      } else {
+        // get from memory iterator
+        return super.next();
+      }
+    } catch (IOException e) {
+      LOG.error("Meets IOException when reading timeseries metadata from disk", e);
+      return null;
+    }
+  }
+
+  private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException {
+    if (currentPos == nextEndPosForDevice) {
+      // deserialize the current device name
+      currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream());
+      nextEndPosForDevice =
+          endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength;
+    }
+    // deserialize public info for measurement
+    String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream());
+    byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream());
+    TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte);
+    int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream());
+    ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize);
+    int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+    if (readSize < chunkBufferSize) {
+      throw new IOException(
+          String.format(
+              "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize));
+    }
+    chunkBuffer.flip();
+
+    // deserialize chunk metadata from chunk buffer
+    List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+    while (chunkBuffer.hasRemaining()) {
+      chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType));
+    }
+    updateCurrentPos();
+    return new Pair<>(
+        currentDevice + "." + measurementUid,
+        constructOneTimeseriesMetadata(measurementUid, chunkMetadataList));
+  }
+
+  private void updateCurrentPos() throws IOException {
+    currentPos = input.position();
+    if (currentPos >= fileLength) {
+      remainsInFile = false;
+      input.close();
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
new file mode 100644
index 0000000000..f11242f296
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data
+ * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads
+ * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk.
+ */
+public class TSMIterator {
+  private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class);
+  protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList;
+  protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator;
+
+  protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null);
+    this.iterator = sortedChunkMetadataList.iterator();
+  }
+
+  public static TSMIterator getTSMIteratorInMemory(
+      List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    return new TSMIterator(chunkGroupMetadataList);
+  }
+
+  public static TSMIterator getTSMIteratorInDisk(
+      File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos)
+      throws IOException {
+    return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos);
+  }
+
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  public Pair<String, TimeseriesMetadata> next() throws IOException {
+    Pair<Path, List<IChunkMetadata>> nextPair = iterator.next();
+    return new Pair<>(
+        nextPair.left.getFullPath(),
+        constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right));
+  }
+
+  public static TimeseriesMetadata constructOneTimeseriesMetadata(
+      String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException {
+    // create TimeseriesMetaData
+    PublicBAOS publicBAOS = new PublicBAOS();
+    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+    int chunkMetadataListLength = 0;
+    boolean serializeStatistic = (chunkMetadataList.size() > 1);
+    // flush chunkMetadataList one by one
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      if (!chunkMetadata.getDataType().equals(dataType)) {
+        continue;
+      }
+      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+    }
+
+    TimeseriesMetadata timeseriesMetadata =
+        new TimeseriesMetadata(
+            (byte)
+                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+            chunkMetadataListLength,
+            measurementId,
+            dataType,
+            seriesStatistics,
+            publicBAOS);
+    return timeseriesMetadata;
+  }
+
+  public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      String currentDevice,
+      List<ChunkMetadata> chunkMetadataList) {
+    Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>());
+      for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .get(chunkGroupMetadata.getDevice())
+            .computeIfAbsent(
+                new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()),
+                x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    if (currentDevice != null) {
+      for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+        chunkMetadataMap
+            .computeIfAbsent(currentDevice, x -> new TreeMap<>())
+            .computeIfAbsent(
+                new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+
+    for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) {
+      Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue();
+      for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry :
+          seriesChunkMetadataMap.entrySet()) {
+        sortedChunkMetadataList.add(
+            new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue()));
+      }
+    }
+    return sortedChunkMetadataList;
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
index 5e518d4280..f7137e95be 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
@@ -234,7 +234,7 @@ public class MetadataIndexConstructorTest {
       assertFalse(iterator.hasNext());
 
       Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
-          reader.getAllTimeseriesMetadata();
+          reader.getAllTimeseriesMetadata(false);
       for (int j = 0; j < actualDevices.size(); j++) {
         for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
           assertEquals(
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 4c2321928a..ba955912f1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -147,7 +147,7 @@ public class TsFileIOWriterTest {
 
     // make sure timeseriesMetadata is only
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
-        reader.getAllTimeseriesMetadata();
+        reader.getAllTimeseriesMetadata(false);
     Set<String> pathSet = new HashSet<>();
     for (Map.Entry<String, List<TimeseriesMetadata>> entry :
         deviceTimeseriesMetadataMap.entrySet()) {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
new file mode 100644
index 0000000000..c97a9a0774
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+  private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+
+  /**
+   * This method check the integrity of file by reading it from the start to the end. It mainly
+   * checks the integrity of the chunks.
+   *
+   * @param filename
+   */
+  public static void checkIntegrityBySequenceRead(String filename) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      String headMagicString = reader.readHeadMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+      String tailMagicString = reader.readTailMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
+      byte marker;
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                TimePageReader timePageReader =
+                    new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+                timeBatch.add(timePageReader.getNextTimeBatch());
+              } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                ValuePageReader valuePageReader =
+                    new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+                TsPrimitiveType[] valueBatch =
+                    valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+              } else { // NonAligned Chunk
+                PageReader pageReader =
+                    new PageReader(
+                        pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+                BatchData batchData = pageReader.getAllSatisfiedPageData();
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            MetaMarker.handleUnexpectedMarker(marker);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+
+  /**
+   * This method checks the integrity of the file by mimicking the process of the query, which reads
+   * the metadata index tree first, and get the timeseries metadata list and chunk metadata list.
+   * After that, this method acquires single chunk according to chunk metadata, then it deserializes
+   * the chunk, and verifies the correctness of the data.
+   *
+   * @param filename File to be check
+   * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>,
+   *     each inner list stands for a chunk.
+   */
+  public static void checkIntegrityByQuery(
+      String filename,
+      Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+          reader.getAllTimeseriesMetadata(true);
+      Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+      // check each series
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) {
+        String deviceId = entry.getKey();
+        List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+        boolean vectorMode = false;
+        if (timeseriesMetadataList.size() > 0
+            && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) {
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size());
+        } else {
+          vectorMode = true;
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1);
+        }
+
+        if (!vectorMode) {
+          // check integrity of not aligned series
+          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+            // get its chunk metadata list, and read the chunk
+            String measurementId = timeseriesMetadata.getMeasurementId();
+            List<List<Pair<Long, TsPrimitiveType>>> originChunks =
+                originData.get(deviceId).get(measurementId);
+            List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+            Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+            chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+            for (int i = 0; i < chunkMetadataList.size(); ++i) {
+              Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i));
+              ChunkReader chunkReader = new ChunkReader(chunk, null);
+              List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i);
+              // deserialize the chunk and verify it with origin data
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNextTimeValuePair()) {
+                  TimeValuePair pair = pointReader.nextTimeValuePair();
+                  Assert.assertEquals(
+                      originValue.get(valIdx).left.longValue(), pair.getTimestamp());
+                  Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue());
+                }
+              }
+            }
+          }
+        } else {
+          // check integrity of vector type
+          // get the timeseries metadata of the time column
+          TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0);
+          List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList();
+          timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+
+          for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+            // traverse each value column
+            List<IChunkMetadata> valueChunkMetadataList =
+                timeseriesMetadataList.get(i).getChunkMetadataList();
+            Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size());
+            List<List<Pair<Long, TsPrimitiveType>>> originDataChunks =
+                originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId());
+            for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) {
+              Chunk timeChunk =
+                  reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx));
+              Chunk valueChunk =
+                  reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx));
+              // construct an aligned chunk reader using time chunk and value chunk
+              IChunkReader chunkReader =
+                  new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null);
+              // verify the values
+              List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx);
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNext()) {
+                  long time = pointReader.currentTime();
+                  Assert.assertEquals(originValue.get(valIdx).left.longValue(), time);
+                  Assert.assertEquals(
+                      originValue.get(valIdx++).right.getValue(),
+                      ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue());
+                  pointReader.next();
+                }
+              }
+            }
+          }
+        }
+      }
+
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
new file mode 100644
index 0000000000..b7c6ff84db
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TsFileIOWriterMemoryControlTest {
+  private static File testFile = new File("target", "1-1-0-0.tsfile");
+  private static File emptyFile = new File("target", "temp");
+  private long TEST_CHUNK_SIZE = 1000;
+  private List<String> sortedSeriesId = new ArrayList<>();
+  private List<String> sortedDeviceId = new ArrayList<>();
+  private boolean init = false;
+
+  @Before
+  public void setUp() throws IOException {
+    if (!init) {
+      init = true;
+      for (int i = 0; i < 2048; ++i) {
+        sortedSeriesId.add("s" + i);
+        sortedDeviceId.add("root.sg.d" + i);
+      }
+      sortedSeriesId.sort((String::compareTo));
+      sortedDeviceId.sort((String::compareTo));
+    }
+    TEST_CHUNK_SIZE = 1000;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (testFile.exists()) {
+      FileUtils.delete(testFile);
+    }
+    if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
+      FileUtils.delete(
+          new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
+    }
+    if (emptyFile.exists()) {
+      FileUtils.delete(emptyFile);
+    }
+  }
+
+  /** The following tests is for ChunkMetadata serialization and deserialization. */
+  @Test
+  public void testSerializeAndDeserializeChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile,
+              writer.chunkGroupMetadataList,
+              writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+        chunkWriter.writeToFileWriter(writer);
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      List<String> measurementIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        measurementIds.add(sortedDeviceId.get(i) + ".");
+        for (int j = 1; j <= 6; ++j) {
+          measurementIds.add(sortedDeviceId.get(i) + ".s" + j);
+        }
+      }
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        String fullPath = timeseriesMetadataPair.left;
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(measurementIds.get(i), fullPath);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      List<String> seriesIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        if (i % 2 == 0) {
+          // write normal series
+          for (int j = 0; j < 5; ++j) {
+            ChunkWriterImpl chunkWriter;
+            switch (j) {
+              case 0:
+                chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+                break;
+              case 1:
+                chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
+                break;
+              case 2:
+                chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+                break;
+              case 3:
+                chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+                break;
+              case 4:
+              default:
+                chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+                break;
+            }
+            chunkWriter.writeToFileWriter(writer);
+            seriesIds.add(deviceId + "." + sortedSeriesId.get(j));
+          }
+        } else {
+          // write vector
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+          chunkWriter.writeToFileWriter(writer);
+          seriesIds.add(deviceId + ".");
+          for (int l = 1; l <= 6; ++l) {
+            seriesIds.add(deviceId + ".s" + l);
+          }
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; i < originChunkMetadataList.size(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(),
+            timeseriesMetadataPair.right.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(),
+            timeseriesMetadataPair.right.getStatistics());
+      }
+    }
+  }
+
+  /** The following tests is for writing normal series in different nums. */
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, valList);
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0L, valList);
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, valList);
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, valList);
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, valList);
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+          writer.checkMetadataSizeAndMayFlush();
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+              .add(valList);
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. We maintain some chunk metadata in memory when calling endFile().
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          if (i < 9) {
+            writer.checkMetadataSizeAndMayFlush();
+          }
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 1;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 1024; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /**
+   * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1024; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /** The following tests is for writing aligned series. */
+
+  /**
+   * Test writing 10 align series, 6 in a group.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
+        for (int j = 1; j <= 6; ++j) {
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent("s" + j, x -> new ArrayList<>())
+              .add(valList.get(j - 1));
+        }
+
+        chunkWriter.writeToFileWriter(writer);
+        writer.endChunkGroup();
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test writing 1 aligned series, for each series we write 512 chunks
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 512, seriesNum = 6;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < chunkNum; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter =
+              generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+          for (int j = 1; j <= seriesNum; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test write aligned chunk metadata, for each aligned series, we write 1024 components.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 1024;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < 10; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 12;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    int deviceNum = 1024;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < deviceNum; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 1024; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingCompleteMixedFiles() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < 10; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
+          for (int j = 1; j <= 6; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      for (int i = 5; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumn() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        TimeChunkWriter timeChunkWriter =
+            new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+        for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+          timeChunkWriter.write(j);
+        }
+        timeChunkWriter.writeToFileWriter(writer);
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          ValueChunkWriter chunkWriter =
+              new ValueChunkWriter(
+                  sortedSeriesId.get(k),
+                  CompressionType.SNAPPY,
+                  TSDataType.DOUBLE,
+                  TSEncoding.PLAIN,
+                  builder.getEncoder(TSDataType.DOUBLE));
+          Random random = new Random();
+          List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+          for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+            double val = random.nextDouble();
+            chunkWriter.write(j, val, false);
+            valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+          }
+          chunkWriter.writeToFileWriter(writer);
+          originValue
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+              .add(valueList);
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    Assert.assertFalse(
+        new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+            .exists());
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  /** The following tests is for writing mixed of normal series and aligned series */
+  private ChunkWriterImpl generateIntData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      long val = random.nextLong();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateFloatData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      float val = random.nextFloat();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateDoubleData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      double val = random.nextDouble();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateBooleanData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      boolean val = random.nextBoolean();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
+    }
+    return chunkWriter;
+  }
+
+  private AlignedChunkWriterImpl generateVectorData(
+      long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.INT32,
+          TSDataType.INT64,
+          TSDataType.FLOAT,
+          TSDataType.DOUBLE,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+    for (int i = 0; i < seriesNum; ++i) {
+      measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
+    }
+    AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
+    Random random = new Random();
+    for (int i = 0; i < seriesNum; ++i) {
+      record.add(new ArrayList<>());
+    }
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
+      for (int j = 0; j < seriesNum; ++j) {
+        switch (j % 6) {
+          case 0:
+            points[j] = new TsPrimitiveType.TsInt(random.nextInt());
+            break;
+          case 1:
+            points[j] = new TsPrimitiveType.TsLong(random.nextLong());
+            break;
+          case 2:
+            points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
+            break;
+          case 3:
+            points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
+            break;
+          case 4:
+            points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
+            break;
+          case 5:
+            points[j] =
+                new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble())));
+            break;
+        }
+      }
+      for (int j = 0; j < seriesNum; ++j) {
+        record.get(j).add(new Pair<>(i, points[j]));
+      }
+      chunkWriter.write(i, points);
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateTextData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      Binary val = new Binary(String.valueOf(random.nextDouble()));
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
+    }
+    return chunkWriter;
+  }
+}