You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/09/22 01:03:21 UTC

[iotdb] branch master updated: [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 265992dc21 [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)
265992dc21 is described below

commit 265992dc2166d80492a6a9bce3b1768514b23865
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu Sep 22 09:03:15 2022 +0800

    [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)
---
 .../resources/conf/iotdb-datanode.properties       |    8 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   21 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   11 +
 .../rewrite/task/ReadPointPerformerSubTask.java    |   10 +-
 .../utils/AlignedSeriesCompactionExecutor.java     |    1 +
 .../utils/SingleSeriesCompactionExecutor.java      |    1 +
 .../compaction/performer/ICompactionPerformer.java |    4 +-
 .../impl/ReadChunkCompactionPerformer.java         |   15 +-
 .../impl/ReadPointCompactionPerformer.java         |   90 +-
 .../writer/AbstractCompactionWriter.java           |   12 +-
 .../writer/CrossSpaceCompactionWriter.java         |   35 +-
 .../writer/InnerSpaceCompactionWriter.java         |   30 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   23 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |    9 +-
 .../db/engine/storagegroup/TsFileResource.java     |    8 +-
 .../iotdb/db/tools/TsFileSplitByPartitionTool.java |    4 +-
 .../file/AbstractTsFileRecoverPerformer.java       |   18 +-
 .../ReadPointCompactionPerformerTest.java          |   70 +-
 .../compaction/inner/InnerSeqCompactionTest.java   |    7 +-
 .../compaction/inner/InnerUnseqCompactionTest.java |   13 +-
 .../inner/ReadChunkCompactionPerformerOldTest.java |    3 +-
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |    8 +
 .../file/metadata/MetadataIndexConstructor.java    |    9 +-
 .../tsfile/file/metadata/MetadataIndexNode.java    |    2 +-
 .../iotdb/tsfile/file/metadata/TsFileMetadata.java |    9 +-
 .../write/writer/RestorableTsFileIOWriter.java     |   12 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  337 ++++--
 .../write/writer/tsmiterator/DiskTSMIterator.java  |  132 ++
 .../write/writer/tsmiterator/TSMIterator.java      |  147 +++
 .../tsfile/write/TsFileIntegrityCheckingTool.java  |  251 ++++
 .../writer/TsFileIOWriterMemoryControlTest.java    | 1261 ++++++++++++++++++++
 31 files changed, 2359 insertions(+), 202 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 699e30407a..cd759c7220 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -466,6 +466,10 @@ timestamp_precision=ms
 # Datatype: int
 # primitive_array_size=32
 
+# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
+# Datatype: double
+# chunk_metadata_size_proportion_in_write=0.1
+
 # Ratio of write memory for invoking flush disk, 0.4 by default
 # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
 # Datatype: double
@@ -582,6 +586,10 @@ timestamp_precision=ms
 # BALANCE: alternate two compaction types
 # compaction_priority=BALANCE
 
+# size proportion for chunk metadata maintains in memory when compacting
+# Datatype: double
+# chunk_metadata_size_proportion_in_compaction=0.05
+
 # The target tsfile size in compaction
 # Datatype: long, Unit: byte
 # target_compaction_file_size=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 734e53fe35..853e37d893 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -149,6 +149,8 @@ public class IoTDBConfig {
   /** The proportion of write memory for memtable */
   private double writeProportion = 0.8;
 
+  private double chunkMetadataSizeProportionInWrite = 0.1;
+
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
@@ -436,6 +438,8 @@ public class IoTDBConfig {
    */
   private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
 
+  private double chunkMetadataSizeProportionInCompaction = 0.05;
+
   /** The target tsfile size in compaction, 1 GB by default */
   private long targetCompactionFileSize = 1073741824L;
 
@@ -3200,6 +3204,23 @@ public class IoTDBConfig {
     this.throttleThreshold = throttleThreshold;
   }
 
+  public double getChunkMetadataSizeProportionInWrite() {
+    return chunkMetadataSizeProportionInWrite;
+  }
+
+  public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
+    this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
+  }
+
+  public double getChunkMetadataSizeProportionInCompaction() {
+    return chunkMetadataSizeProportionInCompaction;
+  }
+
+  public void setChunkMetadataSizeProportionInCompaction(
+      double chunkMetadataSizeProportionInCompaction) {
+    this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
+  }
+
   public long getCacheWindowTimeInMs() {
     return cacheWindowTimeInMs;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a0117f918c..081b54d084 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -629,6 +629,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "concurrent_compaction_thread",
                 Integer.toString(conf.getConcurrentCompactionThread()))));
+    conf.setChunkMetadataSizeProportionInCompaction(
+        Double.parseDouble(
+            properties.getProperty(
+                "chunk_metadata_size_proportion_in_compaction",
+                Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
     conf.setTargetCompactionFileSize(
         Long.parseLong(
             properties.getProperty(
@@ -1458,6 +1463,12 @@ public class IoTDBDescriptor {
       // update tsfile-format config
       loadTsFileProps(properties);
 
+      conf.setChunkMetadataSizeProportionInWrite(
+          Double.parseDouble(
+              properties.getProperty(
+                  "chunk_metadata_size_proportion_in_write",
+                  Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));
+
       // update max_deduplicated_path_num
       conf.setMaxQueryDeduplicatedPathNum(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index bc04c09c33..759975d048 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 
 /**
@@ -45,7 +45,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
   private static final Logger logger =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private final String device;
-  private final Set<String> measurementList;
+  private final List<String> measurementList;
   private final FragmentInstanceContext fragmentInstanceContext;
   private final QueryDataSource queryDataSource;
   private final AbstractCompactionWriter compactionWriter;
@@ -54,7 +54,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
 
   public ReadPointPerformerSubTask(
       String device,
-      Set<String> measurementList,
+      List<String> measurementList,
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
@@ -79,7 +79,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
               device,
               Collections.singletonList(measurement),
               measurementSchemas,
-              schemaMap.keySet(),
+              new ArrayList<>(schemaMap.keySet()),
               fragmentInstanceContext,
               queryDataSource,
               false);
@@ -87,7 +87,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
       if (dataBlockReader.hasNextBatch()) {
         compactionWriter.startMeasurement(measurementSchemas, taskId);
         ReadPointCompactionPerformer.writeWithReader(
-            compactionWriter, dataBlockReader, taskId, false);
+            compactionWriter, dataBlockReader, device, taskId, false);
         compactionWriter.endMeasurement(taskId);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index c9c36378f5..3de4c64a36 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -151,6 +151,7 @@ public class AlignedSeriesCompactionExecutor {
           chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
     }
+    writer.checkMetadataSizeAndMayFlush();
   }
 
   private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 40f2632d0e..d1d4a366e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -151,6 +151,7 @@ public class SingleSeriesCompactionExecutor {
     } else if (pointCountInChunkWriter != 0L) {
       flushChunkWriter();
     }
+    fileWriter.checkMetadataSizeAndMayFlush();
     targetResource.updateStartTime(device, minStartTimestamp);
     targetResource.updateEndTime(device, maxEndTimestamp);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
index 172eb50ee7..2799c3236b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 /**
  * CompactionPerformer is used to compact multiple files into one or multiple files. Different
@@ -35,7 +36,8 @@ import java.util.List;
 public interface ICompactionPerformer {
 
   void perform()
-      throws IOException, MetadataException, StorageEngineException, InterruptedException;
+      throws IOException, MetadataException, StorageEngineException, InterruptedException,
+          ExecutionException;
 
   void setTargetFiles(List<TsFileResource> targetFiles);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index 1a94214848..ac0fa1ddc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.performer.impl;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
 import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
@@ -28,11 +29,11 @@ import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
@@ -63,8 +64,17 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
   @Override
   public void perform()
       throws IOException, MetadataException, InterruptedException, StorageEngineException {
+    // size for file writer is 5% of per compaction task memory budget
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
     try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles);
-        TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
+        TsFileIOWriter writer =
+            new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
       while (deviceIterator.hasNextDevice()) {
         Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
         String device = deviceInfo.left;
@@ -138,7 +148,6 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
       checkThreadInterrupted();
       // TODO: we can provide a configuration item to enable concurrent between each series
       PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
-      IMeasurementSchema measurementSchema;
       // TODO: seriesIterator needs to be refactor.
       // This statement must be called before next hasNextSeries() called, or it may be trapped in a
       // dead-loop.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index de9d36ea16..d2940da096 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -61,7 +61,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -97,7 +96,8 @@ public class ReadPointCompactionPerformer
 
   @Override
   public void perform()
-      throws IOException, MetadataException, StorageEngineException, InterruptedException {
+      throws IOException, MetadataException, StorageEngineException, InterruptedException,
+          ExecutionException {
     long queryId = QueryResourceManager.getInstance().assignCompactionQueryId();
     FragmentInstanceContext fragmentInstanceContext =
         FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
@@ -128,7 +128,6 @@ public class ReadPointCompactionPerformer
       }
 
       compactionWriter.endFile();
-      updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
       updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
     } finally {
       QueryResourceManager.getInstance().endQuery(queryId);
@@ -166,7 +165,7 @@ public class ReadPointCompactionPerformer
             device,
             existedMeasurements,
             measurementSchemas,
-            schemaMap.keySet(),
+            new ArrayList<>(schemaMap.keySet()),
             fragmentInstanceContext,
             queryDataSource,
             true);
@@ -175,9 +174,10 @@ public class ReadPointCompactionPerformer
       // chunkgroup is serialized only when at least one timeseries under this device has data
       compactionWriter.startChunkGroup(device, true);
       compactionWriter.startMeasurement(measurementSchemas, 0);
-      writeWithReader(compactionWriter, dataBlockReader, 0, true);
+      writeWithReader(compactionWriter, dataBlockReader, device, 0, true);
       compactionWriter.endMeasurement(0);
       compactionWriter.endChunkGroup();
+      compactionWriter.checkAndMayFlushChunkMetadata();
     }
   }
 
@@ -187,45 +187,36 @@ public class ReadPointCompactionPerformer
       AbstractCompactionWriter compactionWriter,
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
-      throws IOException, InterruptedException {
-    Map<String, MeasurementSchema> measurementSchemaMap = deviceIterator.getAllMeasurementSchemas();
-    int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
-
-    // assign all measurements to different sub tasks
-    Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
-    int idx = 0;
-    for (String measurement : measurementSchemaMap.keySet()) {
-      if (measurementsForEachSubTask[idx % subTaskNums] == null) {
-        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
-      }
-      measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
-    }
-
+      throws IOException, InterruptedException, IllegalPathException, ExecutionException {
+    MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
+        deviceIterator.iterateNotAlignedSeries(device, false);
+    List<String> allMeasurements =
+        new ArrayList<>(deviceIterator.getAllMeasurementSchemas().keySet());
+    allMeasurements.sort((String::compareTo));
+    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+    Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllMeasurementSchemas();
     // construct sub tasks and start compacting measurements in parallel
-    List<Future<Void>> futures = new ArrayList<>();
     compactionWriter.startChunkGroup(device, false);
-    for (int i = 0; i < subTaskNums; i++) {
-      futures.add(
-          CompactionTaskManager.getInstance()
-              .submitSubTask(
-                  new ReadPointPerformerSubTask(
-                      device,
-                      measurementsForEachSubTask[i],
-                      fragmentInstanceContext,
-                      queryDataSource,
-                      compactionWriter,
-                      measurementSchemaMap,
-                      i)));
-    }
-
-    // wait for all sub tasks finish
-    for (int i = 0; i < subTaskNums; i++) {
-      try {
-        futures.get(i).get();
-      } catch (ExecutionException e) {
-        LOGGER.error("[Compaction] SubCompactionTask meet errors ", e);
-        throw new IOException(e);
+    for (int taskCount = 0; taskCount < allMeasurements.size(); ) {
+      List<Future<Void>> futures = new ArrayList<>();
+      for (int i = 0; i < subTaskNums && taskCount < allMeasurements.size(); i++) {
+        futures.add(
+            CompactionTaskManager.getInstance()
+                .submitSubTask(
+                    new ReadPointPerformerSubTask(
+                        device,
+                        Collections.singletonList(allMeasurements.get(taskCount++)),
+                        fragmentInstanceContext,
+                        queryDataSource,
+                        compactionWriter,
+                        schemaMap,
+                        i)));
+      }
+      for (Future<Void> future : futures) {
+        future.get();
       }
+      // sync all the subtask, and check the writer chunk metadata size
+      compactionWriter.checkAndMayFlushChunkMetadata();
     }
 
     compactionWriter.endChunkGroup();
@@ -262,7 +253,7 @@ public class ReadPointCompactionPerformer
       String deviceId,
       List<String> measurementIds,
       List<IMeasurementSchema> measurementSchemas,
-      Set<String> allSensors,
+      List<String> allSensors,
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource,
       boolean isAlign)
@@ -277,11 +268,20 @@ public class ReadPointCompactionPerformer
       tsDataType = measurementSchemas.get(0).getType();
     }
     return new SeriesDataBlockReader(
-        seriesPath, allSensors, tsDataType, fragmentInstanceContext, queryDataSource, true);
+        seriesPath,
+        new HashSet<>(allSensors),
+        tsDataType,
+        fragmentInstanceContext,
+        queryDataSource,
+        true);
   }
 
   public static void writeWithReader(
-      AbstractCompactionWriter writer, IDataBlockReader reader, int subTaskId, boolean isAligned)
+      AbstractCompactionWriter writer,
+      IDataBlockReader reader,
+      String device,
+      int subTaskId,
+      boolean isAligned)
       throws IOException {
     while (reader.hasNextBatch()) {
       TsBlock tsBlock = reader.nextBatch();
@@ -289,6 +289,7 @@ public class ReadPointCompactionPerformer
         writer.write(
             tsBlock.getTimeColumn(),
             tsBlock.getValueColumns(),
+            device,
             subTaskId,
             tsBlock.getPositionCount());
       } else {
@@ -297,6 +298,7 @@ public class ReadPointCompactionPerformer
           TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
           writer.write(
               timeValuePair.getTimestamp(), timeValuePair.getValue().getValue(), subTaskId);
+          writer.updateStartTimeAndEndTime(device, timeValuePair.getTimestamp(), subTaskId);
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 542c44c4f0..ae01567f9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -72,7 +72,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   public abstract void write(long timestamp, Object value, int subTaskId) throws IOException;
 
-  public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+  public abstract void write(
+      TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
       throws IOException;
 
   public abstract void endFile() throws IOException;
@@ -140,6 +141,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
     measurementPointCountArray[subTaskId] += 1;
   }
 
+  public abstract void updateStartTimeAndEndTime(String device, long time, int subTaskId);
+
   protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
       throws IOException {
     writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
@@ -177,4 +180,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   }
 
   public abstract List<TsFileIOWriter> getFileIOWriter();
+
+  public void checkAndMayFlushChunkMetadata() throws IOException {
+    List<TsFileIOWriter> writers = this.getFileIOWriter();
+    for (TsFileIOWriter writer : writers) {
+      writer.checkMetadataSizeAndMayFlush();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 80902dd1d9..d192c0f6d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -37,6 +39,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
 
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
+  private List<TsFileResource> targetTsFileResources;
 
   // Each sub task has its corresponding seq file index.
   // The index of the array corresponds to subTaskId.
@@ -57,11 +60,21 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
       throws IOException {
+    this.targetTsFileResources = targetResources;
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
     isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    long memorySizeForEachWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction()
+                / targetResources.size());
     for (int i = 0; i < targetResources.size(); i++) {
-      this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
+      this.fileWriterList.add(
+          new TsFileIOWriter(targetResources.get(i).getTsFile(), true, memorySizeForEachWriter));
       isEmptyFile[i] = true;
     }
     this.seqTsFileResources = seqFileResources;
@@ -111,12 +124,19 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   }
 
   @Override
-  public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+  public void write(
+      TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
       throws IOException {
     // todo control time range of target tsfile
     checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId);
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
+    synchronized (this) {
+      // we need to synchronized here to avoid multi-thread competition in sub-task
+      TsFileResource resource = targetTsFileResources.get(seqFileIndexArray[subTaskId]);
+      resource.updateStartTime(device, timestamps.getStartTime());
+      resource.updateEndTime(device, timestamps.getEndTime());
+    }
     checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
     isEmptyFile[seqFileIndexArray[subTaskId]] = false;
@@ -192,4 +212,15 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
       fileIndex++;
     }
   }
+
+  @Override
+  public void updateStartTimeAndEndTime(String device, long time, int subTaskId) {
+    synchronized (this) {
+      int fileIndex = seqFileIndexArray[subTaskId];
+      TsFileResource resource = targetTsFileResources.get(fileIndex);
+      // we need to synchronized here to avoid multi-thread competition in sub-task
+      resource.updateStartTime(device, time);
+      resource.updateEndTime(device, time);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index a73c6c2907..2c3c2e58ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
@@ -32,10 +34,19 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   private TsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
+  private TsFileResource resource;
 
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
-    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getChunkMetadataSizeProportionInCompaction());
+    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter);
     isEmptyFile = true;
+    resource = targetFileResource;
   }
 
   @Override
@@ -65,11 +76,17 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   }
 
   @Override
-  public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+  public void write(
+      TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
       throws IOException {
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
+    synchronized (this) {
+      // we need to synchronized here to avoid multi-thread competition in sub-task
+      resource.updateStartTime(device, timestamps.getStartTime());
+      resource.updateEndTime(device, timestamps.getEndTime());
+    }
     isEmptyFile = false;
   }
 
@@ -89,6 +106,15 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
     fileWriter = null;
   }
 
+  @Override
+  public void updateStartTimeAndEndTime(String device, long time, int subTaskId) {
+    // we need to synchronized here to avoid multi-thread competition in sub-task
+    synchronized (this) {
+      resource.updateStartTime(device, time);
+      resource.updateEndTime(device, time);
+    }
+  }
+
   @Override
   public List<TsFileIOWriter> getFileIOWriter() {
     return Collections.singletonList(fileWriter);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index a3ca350fea..6c6ab965b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -38,6 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -114,14 +117,19 @@ public class MemTableFlushTask {
     long sortTime = 0;
 
     // for map do not use get(key) to iterate
-    for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry :
-        memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID()));
-
-      final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
-      for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
+    Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+    List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+    // sort the IDeviceID in lexicographical order
+    deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+    for (IDeviceID deviceID : deviceIDList) {
+      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID()));
+
+      final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
+      List<String> seriesInOrder = new ArrayList<>(value.keySet());
+      seriesInOrder.sort((String::compareTo));
+      for (String seriesId : seriesInOrder) {
         long startTime = System.currentTimeMillis();
-        IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+        IWritableMemChunk series = value.get(seriesId);
         /*
          * sort task (first task of flush pipeline)
          */
@@ -274,6 +282,7 @@ public class MemTableFlushTask {
               this.writer.endChunkGroup();
             } else {
               ((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
+              writer.checkMetadataSizeAndMayFlush();
             }
           } catch (IOException e) {
             LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 570b97421a..336181281b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -178,7 +178,14 @@ public class TsFileProcessor {
     this.storageGroupName = storageGroupName;
     this.tsFileResource = new TsFileResource(tsfile, this);
     this.storageGroupInfo = storageGroupInfo;
-    this.writer = new RestorableTsFileIOWriter(tsfile);
+    this.writer =
+        new RestorableTsFileIOWriter(
+            tsfile,
+            (long)
+                (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                    * IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getChunkMetadataSizeProportionInWrite()));
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
     this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 99e5cfd1c2..050f50cf92 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -505,10 +506,15 @@ public class TsFileResource {
     modFile = null;
   }
 
-  /** Remove the data file, its resource file, and its modification file physically. */
+  /**
+   * Remove the data file, its resource file, its chunk metadata temp file, and its modification
+   * file physically.
+   */
   public boolean remove() {
     try {
       fsFactory.deleteIfExists(file);
+      fsFactory.deleteIfExists(
+          new File(file.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
     } catch (IOException e) {
       LOGGER.error("TsFile {} cannot be deleted: {}", file, e.getMessage());
       return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
index b573ed354d..ddfa8789b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
@@ -492,10 +492,10 @@ public class TsFileSplitByPartitionTool implements AutoCloseable {
 
   protected TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
       throws IOException {
-    tsFileIOWriter.endFile();
-    TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
         tsFileIOWriter.getDeviceTimeseriesMetadataMap();
+    tsFileIOWriter.endFile();
+    TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
     for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
       String device = entry.getKey();
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
index f2c3934ccf..e506d66c3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.db.wal.recover.file;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +58,12 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
    */
   protected void recoverWithWriter() throws DataRegionException, IOException {
     File tsFile = tsFileResource.getTsFile();
+    File chunkMetadataTempFile =
+        new File(tsFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX);
+    if (chunkMetadataTempFile.exists()) {
+      // delete chunk metadata temp file
+      FileUtils.delete(chunkMetadataTempFile);
+    }
     if (!tsFile.exists()) {
       logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
       return;
@@ -68,7 +77,14 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
 
     // try to remove corrupted part of the TsFile
     try {
-      writer = new RestorableTsFileIOWriter(tsFile);
+      writer =
+          new RestorableTsFileIOWriter(
+              tsFile,
+              (long)
+                  (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+                      * IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getChunkMetadataSizeProportionInWrite()));
     } catch (NotCompatibleTsFileException e) {
       boolean result = tsFile.delete();
       logger.warn(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index c5b65ed8d3..efda562500 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -65,6 +65,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
 import static org.junit.Assert.assertEquals;
@@ -95,7 +96,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testSeqInnerSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, false);
     createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, true);
 
@@ -170,7 +171,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testSeqInnerSpaceCompactionWithDifferentTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(5, 5, false);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
     createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
@@ -289,7 +290,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testUnSeqInnerSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, false);
     createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, false);
 
@@ -375,7 +376,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(9, 9, false);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, false);
     createFiles(2, 3, 5, 50, 150, 150, 50, 50, false, false);
@@ -501,7 +502,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, false);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -636,7 +637,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, false);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -764,7 +765,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, false);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -856,7 +857,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, true);
     createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, true);
 
@@ -953,7 +954,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(50);
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
@@ -1075,7 +1076,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChunk()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
     createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
@@ -1196,7 +1197,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1329,7 +1330,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1510,7 +1511,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1658,7 +1659,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, true);
     createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, false);
 
@@ -1757,7 +1758,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, false);
     createFiles(5, 2, 3, 100, 0, 0, 0, 0, false, true);
     createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
@@ -1848,7 +1849,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithDifferentTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2034,7 +2035,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2233,7 +2234,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithAllDataDeletedInDevice()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2423,7 +2424,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2590,7 +2591,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2795,7 +2796,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithSameTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     registerTimeseriesInMManger(2, 3, true);
     createFiles(5, 2, 3, 100, 0, 0, 0, 0, true, true);
     createFiles(5, 2, 3, 50, 0, 10000, 50, 50, true, false);
@@ -2895,7 +2896,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithDifferentTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, true);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3060,7 +3061,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, true);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3298,7 +3299,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, true);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3495,7 +3496,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -3618,7 +3619,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, false);
     List<Integer> deviceIndex = new ArrayList<>();
@@ -3758,7 +3759,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 5, false);
     List<Integer> deviceIndex = new ArrayList<>();
@@ -3874,7 +3875,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurementsInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, false);
     List<Integer> deviceIndex = new ArrayList<>();
@@ -4012,7 +4013,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, true);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -4241,7 +4242,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 7, true);
     List<Integer> deviceIndex = new ArrayList<>();
@@ -4510,7 +4511,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(5, 5, true);
     List<Integer> deviceIndex = new ArrayList<>();
@@ -4712,7 +4713,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   @Test
   public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource()
       throws IOException, WriteProcessException, MetadataException, StorageEngineException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     registerTimeseriesInMManger(4, 5, true);
     createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -4907,7 +4908,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   }
 
   @Test
-  public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() {
+  public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     try {
       registerTimeseriesInMManger(6, 6, false);
@@ -4982,7 +4983,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
   }
 
   @Test
-  public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() {
+  public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile()
+      throws ExecutionException {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
     try {
       registerTimeseriesInMManger(6, 6, false);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index 09e17a763f..095a6d260f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putChunk;
@@ -350,7 +351,7 @@ public class InnerSeqCompactionTest {
           }
         }
       }
-    } catch (InterruptedException | StorageEngineException e) {
+    } catch (InterruptedException | StorageEngineException | ExecutionException e) {
       e.printStackTrace();
     } finally {
       IoTDBDescriptor.getInstance()
@@ -365,7 +366,7 @@ public class InnerSeqCompactionTest {
   @Test
   public void testAppendPage()
       throws IOException, MetadataException, InterruptedException, StorageEngineException,
-          WriteProcessException {
+          WriteProcessException, ExecutionException {
 
     for (int toMergeFileNum : toMergeFileNums) {
       for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
@@ -632,7 +633,7 @@ public class InnerSeqCompactionTest {
   @Test
   public void testAppendChunk()
       throws IOException, IllegalPathException, MetadataException, StorageEngineException,
-          WriteProcessException {
+          WriteProcessException, ExecutionException {
     long prevChunkPointNumLowerBoundInCompaction =
         IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
     IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 588b1af97e..1fd50b21fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,10 +59,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putOnePageChunk;
 
 public class InnerUnseqCompactionTest {
+  private static final Logger LOG = LoggerFactory.getLogger(InnerUnseqCompactionTest.class);
   static final String COMPACTION_TEST_SG = "root.compactionTest";
   static final String[] fullPaths =
       new String[] {
@@ -132,7 +136,7 @@ public class InnerUnseqCompactionTest {
   @Test
   public void test()
       throws MetadataException, IOException, StorageEngineException, WriteProcessException,
-          InterruptedException {
+          InterruptedException, ExecutionException {
     for (int toMergeFileNum : toMergeFileNums) {
       for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
         for (boolean compactionBeforeHasMod : compactionBeforeHasMods) {
@@ -351,6 +355,13 @@ public class InnerUnseqCompactionTest {
                       toDeleteTimeseriesAndTime, tsFileResource, false);
                 }
               }
+              LOG.error(
+                  "{} {} {} {} {}",
+                  toMergeFileNum,
+                  compactionTimeseriesType,
+                  compactionBeforeHasMod,
+                  compactionHasMod,
+                  compactionOverlapType);
               TsFileResource targetTsFileResource =
                   CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
                           toMergeResources, false)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
index 59b0ab3d3d..b48d494c0b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
@@ -45,6 +45,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,7 +79,7 @@ public class ReadChunkCompactionPerformerOldTest extends InnerCompactionTest {
   @Test
   public void testCompact()
       throws IOException, MetadataException, InterruptedException, StorageEngineException,
-          WriteProcessException {
+          WriteProcessException, ExecutionException {
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..9ee1f7f566 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata {
     return chunkMetaData;
   }
 
+  public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+    ChunkMetadata chunkMetadata = new ChunkMetadata();
+    chunkMetadata.tsDataType = dataType;
+    chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+    chunkMetadata.statistics = Statistics.deserialize(buffer, dataType);
+    return chunkMetadata;
+  }
+
   @Override
   public long getVersion() {
     return version;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..44cdc8b0bf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -81,6 +81,11 @@ public class MetadataIndexConstructor {
               measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
     }
 
+    return checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+  }
+
+  public static MetadataIndexNode checkAndBuildLevelIndex(
+      Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException {
     // if not exceed the max child nodes num, ignore the device index and directly point to the
     // measurement
     if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
@@ -123,7 +128,7 @@ public class MetadataIndexConstructor {
    * @param out tsfile output
    * @param type MetadataIndexNode type
    */
-  private static MetadataIndexNode generateRootNode(
+  public static MetadataIndexNode generateRootNode(
       Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
       throws IOException {
     int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +153,7 @@ public class MetadataIndexConstructor {
     return metadataIndexNodeQueue.poll();
   }
 
-  private static void addCurrentIndexNodeToQueue(
+  public static void addCurrentIndexNodeToQueue(
       MetadataIndexNode currentIndexNode,
       Queue<MetadataIndexNode> metadataIndexNodeQueue,
       TsFileOutput out)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
     this.children.add(metadataIndexEntry);
   }
 
-  boolean isFull() {
+  public boolean isFull() {
     return children.size() >= config.getMaxDegreeOfIndexNode();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..f6f974fc1a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -105,10 +105,15 @@ public class TsFileMetadata {
    * @param outputStream -output stream to determine byte length
    * @return -byte length
    */
-  public int serializeBloomFilter(OutputStream outputStream, Set<Path> paths) throws IOException {
-    int byteLen = 0;
+  public int buildAndSerializeBloomFilter(OutputStream outputStream, Set<Path> paths)
+      throws IOException {
     BloomFilter filter = buildBloomFilter(paths);
+    return serializeBloomFilter(outputStream, filter);
+  }
 
+  public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+      throws IOException {
+    int byteLen = 0;
     byte[] bytes = filter.serialize();
     byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
     outputStream.write(bytes);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 78253124b8..391426cc34 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     this(file, true);
   }
 
+  /**
+   * @param file a given tsfile path you want to (continue to) write
+   * @throws IOException if write failed, or the file is broken but autoRepair==false.
+   */
+  public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    this(file, true);
+    this.maxMetadataSize = maxMetadataSize;
+    this.enableMemoryControl = true;
+    this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+    this.checkMetadataSizeAndMayFlush();
+  }
+
   public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
     if (logger.isDebugEnabled()) {
       logger.debug("{} is opened.", file.getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 22a68bdfd7..851f03c192 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,35 +26,46 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
 import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.TreeMap;
 
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
 /**
  * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
  */
@@ -76,7 +87,7 @@ public class TsFileIOWriter implements AutoCloseable {
   protected File file;
 
   // current flushed Chunk
-  private ChunkMetadata currentChunkMetadata;
+  protected ChunkMetadata currentChunkMetadata;
   // current flushed ChunkGroup
   protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
   // all flushed ChunkGroups
@@ -93,6 +104,20 @@ public class TsFileIOWriter implements AutoCloseable {
   private long minPlanIndex;
   private long maxPlanIndex;
 
+  // the following variable is used for memory control
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  protected boolean enableMemoryControl = false;
+  private Path lastSerializePath = null;
+  protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
   /** empty construct function. */
   protected TsFileIOWriter() {}
 
@@ -126,6 +151,15 @@ public class TsFileIOWriter implements AutoCloseable {
     this.out = output;
   }
 
+  /** for write with memory control */
+  public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
+      throws IOException {
+    this(file);
+    this.enableMemoryControl = enableMemoryControl;
+    this.maxMetadataSize = maxMetadataSize;
+    chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
   /**
    * Writes given bytes to output stream. This method is called when total memory size exceeds the
    * chunk group size threshold.
@@ -249,6 +283,9 @@ public class TsFileIOWriter implements AutoCloseable {
 
   /** end chunk and write some log. */
   public void endCurrentChunk() {
+    if (enableMemoryControl) {
+      this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    }
     chunkMetadataList.add(currentChunkMetadata);
     currentChunkMetadata = null;
   }
@@ -260,47 +297,14 @@ public class TsFileIOWriter implements AutoCloseable {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void endFile() throws IOException {
-    long metaOffset = out.getPosition();
-
-    // serialize the SEPARATOR of MetaData
-    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
-    // group ChunkMetadata by series
-    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
-    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
-      List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
-      for (IChunkMetadata chunkMetadata : chunkMetadatas) {
-        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
-        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
-      }
-    }
-
-    MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
-    TsFileMetadata tsFileMetaData = new TsFileMetadata();
-    tsFileMetaData.setMetadataIndex(metadataIndex);
-    tsFileMetaData.setMetaOffset(metaOffset);
+    checkInMemoryPathCount();
+    readChunkMetadataAndConstructIndexTree();
 
     long footerIndex = out.getPosition();
     if (logger.isDebugEnabled()) {
       logger.debug("start to flush the footer,file pos:{}", footerIndex);
     }
 
-    // write TsFileMetaData
-    int size = tsFileMetaData.serializeTo(out.wrapAsStream());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
-    }
-
-    // write bloom filter
-    size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
-    if (logger.isDebugEnabled()) {
-      logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
-    }
-
-    // write TsFileMetaData size
-    ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
-
     // write magic string
     out.write(MAGIC_STRING_BYTES);
 
@@ -312,63 +316,112 @@ public class TsFileIOWriter implements AutoCloseable {
     canWrite = false;
   }
 
-  /**
-   * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
-   *
-   * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
-   * @return MetadataIndexEntry list in TsFileMetadata
-   */
-  private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
-      throws IOException {
+  private void checkInMemoryPathCount() {
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      pathCount += chunkGroupMetadata.getChunkMetadataList().size();
+    }
+  }
 
-    // convert ChunkMetadataList to this field
-    deviceTimeseriesMetadataMap = new LinkedHashMap<>();
-    // create device -> TimeseriesMetaDataList Map
-    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
-      // for ordinary path
-      flushOneChunkMetadata(entry.getKey(), entry.getValue());
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    if (tempOutput != null) {
+      tempOutput.close();
     }
+    long metaOffset = out.getPosition();
 
-    // construct TsFileMetadata and return
-    return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
-  }
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
-  /**
-   * Flush one chunkMetadata
-   *
-   * @param path Path of chunk
-   * @param chunkMetadataList List of chunkMetadata about path(previous param)
-   */
-  private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
-      throws IOException {
-    // create TimeseriesMetaData
-    PublicBAOS publicBAOS = new PublicBAOS();
-    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
-    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
-    int chunkMetadataListLength = 0;
-    boolean serializeStatistic = (chunkMetadataList.size() > 1);
-    // flush chunkMetadataList one by one
-    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-      if (!chunkMetadata.getDataType().equals(dataType)) {
-        continue;
+    TSMIterator tsmIterator =
+        hasChunkMetadataInDisk
+            ? TSMIterator.getTSMIteratorInDisk(
+                chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
+            : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+    Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+    String currentDevice = null;
+    String prevDevice = null;
+    MetadataIndexNode currentIndexNode =
+        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    int seriesIdxForCurrDevice = 0;
+    BloomFilter filter =
+        BloomFilter.getEmptyBloomFilter(
+            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+    int indexCount = 0;
+    while (tsmIterator.hasNext()) {
+      // read in all chunk metadata of one series
+      // construct the timeseries metadata for this series
+      Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
+      TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+      currentSeries = timeseriesMetadataPair.left;
+
+      indexCount++;
+      // build bloom filter
+      filter.add(currentSeries);
+      // construct the index tree node for the series
+      Path currentPath = null;
+      if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+        // this series is the time column of the aligned device
+        // the full series path will be like "root.sg.d."
+        // we remove the last . in the series id here
+        currentPath = new Path(currentSeries);
+        currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+      } else {
+        currentPath = new Path(currentSeries, true);
+        currentDevice = currentPath.getDevice();
       }
-      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
-      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+      if (!currentDevice.equals(prevDevice)) {
+        if (prevDevice != null) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          deviceMetadataIndexMap.put(
+              prevDevice,
+              generateRootNode(
+                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        measurementMetadataIndexQueue = new ArrayDeque<>();
+        seriesIdxForCurrDevice = 0;
+      }
+
+      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+          currentIndexNode.addEntry(
+              new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+        } else {
+          currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+        }
+      }
+
+      prevDevice = currentDevice;
+      seriesIdxForCurrDevice++;
+      // serialize the timeseries metadata to file
+      timeseriesMetadata.serializeTo(out.wrapAsStream());
     }
 
-    TimeseriesMetadata timeseriesMetadata =
-        new TimeseriesMetadata(
-            (byte)
-                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
-            chunkMetadataListLength,
-            path.getMeasurement(),
-            dataType,
-            seriesStatistics,
-            publicBAOS);
-    deviceTimeseriesMetadataMap
-        .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
-        .add(timeseriesMetadata);
+    addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+    if (prevDevice != null) {
+      deviceMetadataIndexMap.put(
+          prevDevice,
+          generateRootNode(
+              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+    }
+
+    MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+    TsFileMetadata tsFileMetadata = new TsFileMetadata();
+    tsFileMetadata.setMetadataIndex(metadataIndex);
+    tsFileMetadata.setMetaOffset(metaOffset);
+
+    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());
   }
 
   /**
@@ -412,6 +465,9 @@ public class TsFileIOWriter implements AutoCloseable {
   public void close() throws IOException {
     canWrite = false;
     out.close();
+    if (tempOutput != null) {
+      this.tempOutput.close();
+    }
   }
 
   void writeSeparatorMaskForTest() throws IOException {
@@ -490,6 +546,30 @@ public class TsFileIOWriter implements AutoCloseable {
    * @return DeviceTimeseriesMetadataMap
    */
   public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
+    Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
+    Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
+            .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    for (String device : chunkMetadataMap.keySet()) {
+      Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
+      for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
+        try {
+          deviceTimeseriesMetadataMap
+              .computeIfAbsent(device, x -> new ArrayList<>())
+              .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
+        } catch (IOException e) {
+          logger.error("Failed to get device timeseries metadata map", e);
+          return null;
+        }
+      }
+    }
+
     return deviceTimeseriesMetadataMap;
   }
 
@@ -508,4 +588,85 @@ public class TsFileIOWriter implements AutoCloseable {
   public void setMaxPlanIndex(long maxPlanIndex) {
     this.maxPlanIndex = maxPlanIndex;
   }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
+   * in row, you should make sure all data of current writing device has been written before this
+   * method is called. For writing not aligned series or writing aligned series in column, you
+   * should make sure that all data of one series is written before you call this function.</b>
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+        TSMIterator.sortChunkMetadata(
+            chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
+      Path seriesPath = pair.left;
+      boolean isNewPath = !seriesPath.equals(lastSerializePath);
+      if (isNewPath) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = pair.right;
+      writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+      lastSerializePath = seriesPath;
+      logger.debug("Flushing {}", seriesPath);
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+  }
+
+  private void writeChunkMetadataToTempFile(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
+      throws IOException {
+    // [DeviceId] measurementId datatype size chunkMetadataBuffer
+    if (lastSerializePath == null
+        || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
+      // mark the end position of last device
+      endPosInCMTForDevice.add(tempOutput.getPosition());
+      // serialize the device
+      // for each device, we only serialize it once, in order to save io
+      ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
+    }
+    if (isNewPath && iChunkMetadataList.size() > 0) {
+      // serialize the public info of this measurement
+      ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
+      ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
+    }
+    PublicBAOS buffer = new PublicBAOS();
+    int totalSize = 0;
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      totalSize += chunkMetadata.serializeTo(buffer, true);
+    }
+    ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+    buffer.writeTo(tempOutput);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
new file mode 100644
index 0000000000..fd02f1438a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of
+ * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk
+ * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory.
+ */
+public class DiskTSMIterator extends TSMIterator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class);
+
+  private LinkedList<Long> endPosForEachDevice;
+  private File cmtFile;
+  private LocalTsFileInput input;
+  private long fileLength = 0;
+  private long currentPos = 0;
+  private long nextEndPosForDevice = 0;
+  private String currentDevice;
+  private boolean remainsInFile = true;
+
+  protected DiskTSMIterator(
+      File cmtFile,
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      LinkedList<Long> endPosForEachDevice)
+      throws IOException {
+    super(chunkGroupMetadataList);
+    this.cmtFile = cmtFile;
+    this.endPosForEachDevice = endPosForEachDevice;
+    this.input = new LocalTsFileInput(cmtFile.toPath());
+    this.fileLength = cmtFile.length();
+    this.nextEndPosForDevice = endPosForEachDevice.removeFirst();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return remainsInFile || iterator.hasNext();
+  }
+
+  @Override
+  public Pair<String, TimeseriesMetadata> next() {
+    try {
+      if (remainsInFile) {
+        // deserialize from file
+        return getTimeSerisMetadataFromFile();
+      } else {
+        // get from memory iterator
+        return super.next();
+      }
+    } catch (IOException e) {
+      LOG.error("Meets IOException when reading timeseries metadata from disk", e);
+      return null;
+    }
+  }
+
+  private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException {
+    if (currentPos == nextEndPosForDevice) {
+      // deserialize the current device name
+      currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream());
+      nextEndPosForDevice =
+          endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength;
+    }
+    // deserialize public info for measurement
+    String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream());
+    byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream());
+    TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte);
+    int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream());
+    ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize);
+    int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+    if (readSize < chunkBufferSize) {
+      throw new IOException(
+          String.format(
+              "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize));
+    }
+    chunkBuffer.flip();
+
+    // deserialize chunk metadata from chunk buffer
+    List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+    while (chunkBuffer.hasRemaining()) {
+      chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType));
+    }
+    updateCurrentPos();
+    return new Pair<>(
+        currentDevice + "." + measurementUid,
+        constructOneTimeseriesMetadata(measurementUid, chunkMetadataList));
+  }
+
+  private void updateCurrentPos() throws IOException {
+    currentPos = input.position();
+    if (currentPos >= fileLength) {
+      remainsInFile = false;
+      input.close();
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
new file mode 100644
index 0000000000..f11242f296
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data
+ * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads
+ * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk.
+ */
+public class TSMIterator {
+  private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class);
+  protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList;
+  protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator;
+
+  protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null);
+    this.iterator = sortedChunkMetadataList.iterator();
+  }
+
+  public static TSMIterator getTSMIteratorInMemory(
+      List<ChunkGroupMetadata> chunkGroupMetadataList) {
+    return new TSMIterator(chunkGroupMetadataList);
+  }
+
+  public static TSMIterator getTSMIteratorInDisk(
+      File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos)
+      throws IOException {
+    return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos);
+  }
+
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  public Pair<String, TimeseriesMetadata> next() throws IOException {
+    Pair<Path, List<IChunkMetadata>> nextPair = iterator.next();
+    return new Pair<>(
+        nextPair.left.getFullPath(),
+        constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right));
+  }
+
+  public static TimeseriesMetadata constructOneTimeseriesMetadata(
+      String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException {
+    // create TimeseriesMetaData
+    PublicBAOS publicBAOS = new PublicBAOS();
+    TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+    Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+    int chunkMetadataListLength = 0;
+    boolean serializeStatistic = (chunkMetadataList.size() > 1);
+    // flush chunkMetadataList one by one
+    for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      if (!chunkMetadata.getDataType().equals(dataType)) {
+        continue;
+      }
+      chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+      seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+    }
+
+    TimeseriesMetadata timeseriesMetadata =
+        new TimeseriesMetadata(
+            (byte)
+                ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+            chunkMetadataListLength,
+            measurementId,
+            dataType,
+            seriesStatistics,
+            publicBAOS);
+    return timeseriesMetadata;
+  }
+
+  public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      String currentDevice,
+      List<ChunkMetadata> chunkMetadataList) {
+    Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>());
+      for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        chunkMetadataMap
+            .get(chunkGroupMetadata.getDevice())
+            .computeIfAbsent(
+                new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()),
+                x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+    if (currentDevice != null) {
+      for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+        chunkMetadataMap
+            .computeIfAbsent(currentDevice, x -> new TreeMap<>())
+            .computeIfAbsent(
+                new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>())
+            .add(chunkMetadata);
+      }
+    }
+
+    for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) {
+      Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue();
+      for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry :
+          seriesChunkMetadataMap.entrySet()) {
+        sortedChunkMetadataList.add(
+            new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue()));
+      }
+    }
+    return sortedChunkMetadataList;
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
new file mode 100644
index 0000000000..c97a9a0774
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+  private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+
+  /**
+   * This method check the integrity of file by reading it from the start to the end. It mainly
+   * checks the integrity of the chunks.
+   *
+   * @param filename
+   */
+  public static void checkIntegrityBySequenceRead(String filename) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      String headMagicString = reader.readHeadMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+      String tailMagicString = reader.readTailMagic();
+      Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
+      byte marker;
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                TimePageReader timePageReader =
+                    new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+                timeBatch.add(timePageReader.getNextTimeBatch());
+              } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                ValuePageReader valuePageReader =
+                    new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+                TsPrimitiveType[] valueBatch =
+                    valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+              } else { // NonAligned Chunk
+                PageReader pageReader =
+                    new PageReader(
+                        pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+                BatchData batchData = pageReader.getAllSatisfiedPageData();
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            MetaMarker.handleUnexpectedMarker(marker);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+
+  /**
+   * This method checks the integrity of the file by mimicking the process of the query, which reads
+   * the metadata index tree first, and get the timeseries metadata list and chunk metadata list.
+   * After that, this method acquires single chunk according to chunk metadata, then it deserializes
+   * the chunk, and verifies the correctness of the data.
+   *
+   * @param filename File to be check
+   * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>,
+   *     each inner list stands for a chunk.
+   */
+  public static void checkIntegrityByQuery(
+      String filename,
+      Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) {
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+          reader.getAllTimeseriesMetadata(true);
+      Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+      // check each series
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) {
+        String deviceId = entry.getKey();
+        List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+        boolean vectorMode = false;
+        if (timeseriesMetadataList.size() > 0
+            && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) {
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size());
+        } else {
+          vectorMode = true;
+          Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1);
+        }
+
+        if (!vectorMode) {
+          // check integrity of not aligned series
+          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+            // get its chunk metadata list, and read the chunk
+            String measurementId = timeseriesMetadata.getMeasurementId();
+            List<List<Pair<Long, TsPrimitiveType>>> originChunks =
+                originData.get(deviceId).get(measurementId);
+            List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+            Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+            chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+            for (int i = 0; i < chunkMetadataList.size(); ++i) {
+              Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i));
+              ChunkReader chunkReader = new ChunkReader(chunk, null);
+              List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i);
+              // deserialize the chunk and verify it with origin data
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNextTimeValuePair()) {
+                  TimeValuePair pair = pointReader.nextTimeValuePair();
+                  Assert.assertEquals(
+                      originValue.get(valIdx).left.longValue(), pair.getTimestamp());
+                  Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue());
+                }
+              }
+            }
+          }
+        } else {
+          // check integrity of vector type
+          // get the timeseries metadata of the time column
+          TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0);
+          List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList();
+          timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+
+          for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+            // traverse each value column
+            List<IChunkMetadata> valueChunkMetadataList =
+                timeseriesMetadataList.get(i).getChunkMetadataList();
+            Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size());
+            List<List<Pair<Long, TsPrimitiveType>>> originDataChunks =
+                originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId());
+            for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) {
+              Chunk timeChunk =
+                  reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx));
+              Chunk valueChunk =
+                  reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx));
+              // construct an aligned chunk reader using time chunk and value chunk
+              IChunkReader chunkReader =
+                  new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null);
+              // verify the values
+              List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx);
+              for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+                IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator();
+                while (pointReader.hasNext()) {
+                  long time = pointReader.currentTime();
+                  Assert.assertEquals(originValue.get(valIdx).left.longValue(), time);
+                  Assert.assertEquals(
+                      originValue.get(valIdx++).right.getValue(),
+                      ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue());
+                  pointReader.next();
+                }
+              }
+            }
+          }
+        }
+      }
+
+    } catch (IOException e) {
+      LOG.error("Meet exception when checking integrity of tsfile", e);
+      Assert.fail();
+    }
+  }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
new file mode 100644
index 0000000000..44e4af3678
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -0,0 +1,1261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TsFileIOWriterMemoryControlTest {
+  private static File testFile = new File("target", "1-1-0-0.tsfile");
+  private static File emptyFile = new File("target", "temp");
+  private long TEST_CHUNK_SIZE = 1000;
+  private List<String> sortedSeriesId = new ArrayList<>();
+  private List<String> sortedDeviceId = new ArrayList<>();
+  private boolean init = false;
+
+  @Before
+  public void setUp() throws IOException {
+    if (!init) {
+      init = true;
+      for (int i = 0; i < 2048; ++i) {
+        sortedSeriesId.add("s" + i);
+        sortedDeviceId.add("root.sg.d" + i);
+      }
+      sortedSeriesId.sort((String::compareTo));
+      sortedDeviceId.sort((String::compareTo));
+    }
+    TEST_CHUNK_SIZE = 1000;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (testFile.exists()) {
+      FileUtils.delete(testFile);
+    }
+    if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
+      FileUtils.delete(
+          new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
+    }
+    if (emptyFile.exists()) {
+      FileUtils.delete(emptyFile);
+    }
+  }
+
+  /** The following tests is for ChunkMetadata serialization and deserialization. */
+  @Test
+  public void testSerializeAndDeserializeChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile,
+              writer.chunkGroupMetadataList,
+              writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+        chunkWriter.writeToFileWriter(writer);
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      List<String> measurementIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        measurementIds.add(sortedDeviceId.get(i) + ".");
+        for (int j = 1; j <= 6; ++j) {
+          measurementIds.add(sortedDeviceId.get(i) + ".s" + j);
+        }
+      }
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; iterator.hasNext(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        String fullPath = timeseriesMetadataPair.left;
+        TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+        Assert.assertEquals(measurementIds.get(i), fullPath);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+      }
+    }
+  }
+
+  @Test
+  public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+      List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+      List<String> seriesIds = new ArrayList<>();
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        if (i % 2 == 0) {
+          // write normal series
+          for (int j = 0; j < 5; ++j) {
+            ChunkWriterImpl chunkWriter;
+            switch (j) {
+              case 0:
+                chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+                break;
+              case 1:
+                chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
+                break;
+              case 2:
+                chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+                break;
+              case 3:
+                chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+                break;
+              case 4:
+              default:
+                chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+                break;
+            }
+            chunkWriter.writeToFileWriter(writer);
+            seriesIds.add(deviceId + "." + sortedSeriesId.get(j));
+          }
+        } else {
+          // write vector
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+          chunkWriter.writeToFileWriter(writer);
+          seriesIds.add(deviceId + ".");
+          for (int l = 1; l <= 6; ++l) {
+            seriesIds.add(deviceId + ".s" + l);
+          }
+        }
+        originChunkMetadataList.addAll(writer.chunkMetadataList);
+        writer.endChunkGroup();
+      }
+      writer.sortAndFlushChunkMetadata();
+      writer.tempOutput.flush();
+
+      TSMIterator iterator =
+          TSMIterator.getTSMIteratorInDisk(
+              writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+      for (int i = 0; i < originChunkMetadataList.size(); ++i) {
+        Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+        Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left);
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getDataType(),
+            timeseriesMetadataPair.right.getTSDataType());
+        Assert.assertEquals(
+            originChunkMetadataList.get(i).getStatistics(),
+            timeseriesMetadataPair.right.getStatistics());
+      }
+    }
+  }
+
+  /** The following tests is for writing normal series in different nums. */
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              chunkWriter = generateIntData(j, 0L, valList);
+              break;
+            case 1:
+              chunkWriter = generateBooleanData(j, 0L, valList);
+              break;
+            case 2:
+              chunkWriter = generateFloatData(j, 0L, valList);
+              break;
+            case 3:
+              chunkWriter = generateDoubleData(j, 0L, valList);
+              break;
+            case 4:
+            default:
+              chunkWriter = generateTextData(j, 0L, valList);
+              break;
+          }
+          chunkWriter.writeToFileWriter(writer);
+          writer.checkMetadataSizeAndMayFlush();
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+              .add(valList);
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+   * for it. We maintain some chunk metadata in memory when calling endFile().
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          if (i < 9) {
+            writer.checkMetadataSizeAndMayFlush();
+          }
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 1024; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 1;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 2; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 1024; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 50; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /**
+   * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
+   * for it. This test make sure that each chunk
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+    long originTestChunkSize = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1024; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j % 5) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originTimes
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      writer.endFile();
+    } finally {
+      TEST_CHUNK_SIZE = originTestChunkSize;
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+  }
+
+  /** The following tests is for writing aligned series. */
+
+  /**
+   * Test writing 10 align series, 6 in a group.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+        AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
+        for (int j = 1; j <= 6; ++j) {
+          originData
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent("s" + j, x -> new ArrayList<>())
+              .add(valList.get(j - 1));
+        }
+
+        chunkWriter.writeToFileWriter(writer);
+        writer.endChunkGroup();
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test writing 1 aligned series, for each series we write 512 chunks
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 512, seriesNum = 6;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 1; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < chunkNum; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter =
+              generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+          for (int j = 1; j <= seriesNum; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  /**
+   * Test write aligned chunk metadata, for each aligned series, we write 1024 components.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 1024;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < 10; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    int chunkNum = 5, seriesNum = 12;
+    long originTestPointNum = TEST_CHUNK_SIZE;
+    TEST_CHUNK_SIZE = 10;
+    int deviceNum = 1024;
+    try {
+      try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+        for (int i = 0; i < deviceNum; ++i) {
+          String deviceId = sortedDeviceId.get(i);
+          for (int k = 0; k < chunkNum; ++k) {
+            writer.startChunkGroup(deviceId);
+            List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+            AlignedChunkWriterImpl chunkWriter =
+                generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+            for (int j = 1; j <= seriesNum; ++j) {
+              originData
+                  .computeIfAbsent(deviceId, x -> new HashMap<>())
+                  .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                  .add(valList.get(j - 1));
+            }
+
+            chunkWriter.writeToFileWriter(writer);
+            writer.endChunkGroup();
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endFile();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+      }
+    } finally {
+      TEST_CHUNK_SIZE = originTestPointNum;
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    TEST_CHUNK_SIZE = 10;
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 1024; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingCompleteMixedFiles() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        for (int k = 0; k < 10; ++k) {
+          writer.startChunkGroup(deviceId);
+          List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+          AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
+          for (int j = 1; j <= 6; ++j) {
+            originData
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent("s" + j, x -> new ArrayList<>())
+                .add(valList.get(j - 1));
+          }
+
+          chunkWriter.writeToFileWriter(writer);
+          writer.endChunkGroup();
+        }
+        writer.checkMetadataSizeAndMayFlush();
+      }
+      for (int i = 5; i < 10; ++i) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        for (int j = 0; j < 5; ++j) {
+          ChunkWriterImpl chunkWriter;
+          switch (j) {
+            case 0:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 1:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 2:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 3:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+            case 4:
+            default:
+              for (int k = 0; k < 10; ++k) {
+                List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+                chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+                chunkWriter.writeToFileWriter(writer);
+                originData
+                    .computeIfAbsent(deviceId, x -> new HashMap<>())
+                    .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+                    .add(valList);
+              }
+              break;
+          }
+          writer.checkMetadataSizeAndMayFlush();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+      Assert.assertTrue(writer.hasChunkMetadataInDisk);
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumn() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        TimeChunkWriter timeChunkWriter =
+            new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+        for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+          timeChunkWriter.write(j);
+        }
+        timeChunkWriter.writeToFileWriter(writer);
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          ValueChunkWriter chunkWriter =
+              new ValueChunkWriter(
+                  sortedSeriesId.get(k),
+                  CompressionType.SNAPPY,
+                  TSDataType.DOUBLE,
+                  TSEncoding.PLAIN,
+                  builder.getEncoder(TSDataType.DOUBLE));
+          Random random = new Random();
+          List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+          for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+            double val = random.nextDouble();
+            chunkWriter.write(j, val, false);
+            valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+          }
+          chunkWriter.writeToFileWriter(writer);
+          originValue
+              .computeIfAbsent(deviceId, x -> new HashMap<>())
+              .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+              .add(valueList);
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  @Test
+  public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
+    Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+    try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+      for (int i = 0; i < 5; i++) {
+        String deviceId = sortedDeviceId.get(i);
+        writer.startChunkGroup(deviceId);
+        TSEncoding timeEncoding =
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+        TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+        Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+        for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+          TimeChunkWriter timeChunkWriter =
+              new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+          for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+            timeChunkWriter.write(j);
+          }
+          timeChunkWriter.writeToFileWriter(writer);
+        }
+        writer.sortAndFlushChunkMetadata();
+        Assert.assertTrue(writer.hasChunkMetadataInDisk);
+        for (int k = 0; k < 5; ++k) {
+          TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+          builder.initFromProps(null);
+          for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+            ValueChunkWriter chunkWriter =
+                new ValueChunkWriter(
+                    sortedSeriesId.get(k),
+                    CompressionType.SNAPPY,
+                    TSDataType.DOUBLE,
+                    TSEncoding.PLAIN,
+                    builder.getEncoder(TSDataType.DOUBLE));
+            Random random = new Random();
+            List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+            for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+              double val = random.nextDouble();
+              chunkWriter.write(j, val, false);
+              valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+            }
+            chunkWriter.writeToFileWriter(writer);
+            originValue
+                .computeIfAbsent(deviceId, x -> new HashMap<>())
+                .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+                .add(valueList);
+          }
+          writer.sortAndFlushChunkMetadata();
+        }
+        writer.endChunkGroup();
+      }
+      writer.endFile();
+    }
+    TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+    TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+  }
+
+  /** The following tests is for writing mixed of normal series and aligned series */
+  private ChunkWriterImpl generateIntData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      long val = random.nextLong();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateFloatData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      float val = random.nextFloat();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateDoubleData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      double val = random.nextDouble();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateBooleanData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      boolean val = random.nextBoolean();
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
+    }
+    return chunkWriter;
+  }
+
+  private AlignedChunkWriterImpl generateVectorData(
+      long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.INT32,
+          TSDataType.INT64,
+          TSDataType.FLOAT,
+          TSDataType.DOUBLE,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+    for (int i = 0; i < seriesNum; ++i) {
+      measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
+    }
+    AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
+    Random random = new Random();
+    for (int i = 0; i < seriesNum; ++i) {
+      record.add(new ArrayList<>());
+    }
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
+      for (int j = 0; j < seriesNum; ++j) {
+        switch (j % 6) {
+          case 0:
+            points[j] = new TsPrimitiveType.TsInt(random.nextInt());
+            break;
+          case 1:
+            points[j] = new TsPrimitiveType.TsLong(random.nextLong());
+            break;
+          case 2:
+            points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
+            break;
+          case 3:
+            points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
+            break;
+          case 4:
+            points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
+            break;
+          case 5:
+            points[j] =
+                new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble())));
+            break;
+        }
+      }
+      for (int j = 0; j < seriesNum; ++j) {
+        record.get(j).add(new Pair<>(i, points[j]));
+      }
+      chunkWriter.write(i, points);
+    }
+    return chunkWriter;
+  }
+
+  private ChunkWriterImpl generateTextData(
+      int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+    ChunkWriterImpl chunkWriter =
+        new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
+    Random random = new Random();
+    for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+      Binary val = new Binary(String.valueOf(random.nextDouble()));
+      chunkWriter.write(i, val);
+      record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
+    }
+    return chunkWriter;
+  }
+}