You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/20 00:53:41 UTC

[iotdb] branch master updated: Add UTs for new insert framework and fix insertRowNode memControl (#5576)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b22a2799 Add UTs for new insert framework and fix insertRowNode memControl (#5576)
79b22a2799 is described below

commit 79b22a2799245ccb02ec022eb5c9b3ee0a6d8721
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Apr 20 08:53:34 2022 +0800

    Add UTs for new insert framework and fix insertRowNode memControl (#5576)
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 214 +++--
 .../db/engine/storagegroup/DataRegionTest.java     | 910 +++++++++++++++++++++
 .../engine/storagegroup/TsFileProcessorTest.java   |  17 +
 ...ocessorTest.java => TsFileProcessorV2Test.java} | 104 ++-
 4 files changed, 1085 insertions(+), 160 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9d5e0d323d..7274954381 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -223,9 +223,15 @@ public class TsFileProcessor {
     long[] memIncrements = null;
     if (enableMemControl) {
       if (insertRowPlan.isAligned()) {
-        memIncrements = checkAlignedMemCostAndAddToTspInfo(insertRowPlan);
+        memIncrements =
+            checkAlignedMemCostAndAddToTspInfo(
+                insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getMeasurements(),
+                insertRowPlan.getDataTypes(), insertRowPlan.getValues());
       } else {
-        memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan);
+        memIncrements =
+            checkMemCostAndAddToTspInfo(
+                insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getMeasurements(),
+                insertRowPlan.getDataTypes(), insertRowPlan.getValues());
       }
     }
 
@@ -277,9 +283,15 @@ public class TsFileProcessor {
     long[] memIncrements = null;
     if (enableMemControl) {
       if (insertRowNode.isAligned()) {
-        // memIncrements = checkAlignedMemCostAndAddToTspInfo(insertRowNode);
+        memIncrements =
+            checkAlignedMemCostAndAddToTspInfo(
+                insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(),
+                insertRowNode.getDataTypes(), insertRowNode.getValues());
       } else {
-        // memIncrements = checkMemCostAndAddToTspInfo(insertRowNode);
+        memIncrements =
+            checkMemCostAndAddToTspInfo(
+                insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(),
+                insertRowNode.getDataTypes(), insertRowNode.getValues());
       }
     }
 
@@ -339,9 +351,23 @@ public class TsFileProcessor {
     try {
       if (enableMemControl) {
         if (insertTabletPlan.isAligned()) {
-          memIncrements = checkAlignedMemCostAndAddToTsp(insertTabletPlan, start, end);
+          memIncrements =
+              checkAlignedMemCostAndAddToTsp(
+                  insertTabletPlan.getDevicePath().getFullPath(),
+                  insertTabletPlan.getMeasurements(),
+                  insertTabletPlan.getDataTypes(),
+                  insertTabletPlan.getColumns(),
+                  start,
+                  end);
         } else {
-          memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+          memIncrements =
+              checkMemCostAndAddToTspInfo(
+                  insertTabletPlan.getDevicePath().getFullPath(),
+                  insertTabletPlan.getMeasurements(),
+                  insertTabletPlan.getDataTypes(),
+                  insertTabletPlan.getColumns(),
+                  start,
+                  end);
         }
       }
     } catch (WriteProcessException e) {
@@ -422,9 +448,23 @@ public class TsFileProcessor {
     try {
       if (enableMemControl) {
         if (insertTabletNode.isAligned()) {
-          memIncrements = checkAlignedMemCostAndAddToTsp(insertTabletNode, start, end);
+          memIncrements =
+              checkAlignedMemCostAndAddToTsp(
+                  insertTabletNode.getDevicePath().getFullPath(),
+                  insertTabletNode.getMeasurements(),
+                  insertTabletNode.getDataTypes(),
+                  insertTabletNode.getColumns(),
+                  start,
+                  end);
         } else {
-          memIncrements = checkMemCostAndAddToTspInfo(insertTabletNode, start, end);
+          memIncrements =
+              checkMemCostAndAddToTspInfo(
+                  insertTabletNode.getDevicePath().getFullPath(),
+                  insertTabletNode.getMeasurements(),
+                  insertTabletNode.getDataTypes(),
+                  insertTabletNode.getColumns(),
+                  start,
+                  end);
         }
       }
     } catch (WriteProcessException e) {
@@ -481,43 +521,41 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+  private long[] checkMemCostAndAddToTspInfo(
+      String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values)
       throws WriteProcessException {
     // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     // get device id
-    IDeviceID deviceID = null;
+    IDeviceID deviceID;
     try {
-      deviceID = getDeviceID(insertRowPlan.getDevicePath().getFullPath());
+      deviceID = getDeviceID(deviceId);
     } catch (IllegalPathException e) {
       throw new WriteProcessException(e);
     }
 
-    for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+    for (int i = 0; i < dataTypes.length; i++) {
       // skip failed Measurements
-      if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) {
+      if (dataTypes[i] == null || measurements[i] == null) {
         continue;
       }
-      if (workMemTable.checkIfChunkDoesNotExist(deviceID, insertRowPlan.getMeasurements()[i])) {
+      if (workMemTable.checkIfChunkDoesNotExist(deviceID, measurements[i])) {
         // ChunkMetadataIncrement
-        chunkMetadataIncrement +=
-            ChunkMetadata.calculateRamSize(
-                insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
-        memTableIncrement += TVList.tvListArrayMemCost(insertRowPlan.getDataTypes()[i]);
+        chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+        memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
       } else {
         // here currentChunkPointNum >= 1
-        long currentChunkPointNum =
-            workMemTable.getCurrentTVListSize(deviceID, insertRowPlan.getMeasurements()[i]);
+        long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceID, measurements[i]);
         memTableIncrement +=
             (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
-                ? TVList.tvListArrayMemCost(insertRowPlan.getDataTypes()[i])
+                ? TVList.tvListArrayMemCost(dataTypes[i])
                 : 0;
       }
       // TEXT data mem size
-      if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) {
-        textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
+      if (dataTypes[i] == TSDataType.TEXT) {
+        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
@@ -525,7 +563,8 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+  private long[] checkAlignedMemCostAndAddToTspInfo(
+      String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values)
       throws WriteProcessException {
     // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
     long memTableIncrement = 0L;
@@ -533,9 +572,9 @@ public class TsFileProcessor {
     long chunkMetadataIncrement = 0L;
     AlignedWritableMemChunk alignedMemChunk = null;
     // get device id
-    IDeviceID deviceID = null;
+    IDeviceID deviceID;
     try {
-      deviceID = getDeviceID(insertRowPlan.getDevicePath().getFullPath());
+      deviceID = getDeviceID(deviceId);
     } catch (IllegalPathException e) {
       throw new WriteProcessException(e);
     }
@@ -544,74 +583,47 @@ public class TsFileProcessor {
       // ChunkMetadataIncrement
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
-              * insertRowPlan.getDataTypes().length;
-      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(insertRowPlan.getDataTypes());
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
     } else {
       // here currentChunkPointNum >= 1
       long currentChunkPointNum =
           workMemTable.getCurrentTVListSize(deviceID, AlignedPath.VECTOR_PLACEHOLDER);
       memTableIncrement +=
           (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
-              ? AlignedTVList.alignedTvListArrayMemCost(insertRowPlan.getDataTypes())
+              ? AlignedTVList.alignedTvListArrayMemCost(dataTypes)
               : 0;
       alignedMemChunk =
           ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceID))
               .getAlignedMemChunk();
     }
-    for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+    for (int i = 0; i < dataTypes.length; i++) {
       // skip failed Measurements
-      if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) {
+      if (dataTypes[i] == null || measurements[i] == null) {
         continue;
       }
       // extending the column of aligned mem chunk
-      if (alignedMemChunk != null
-          && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+      if (alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) {
         memTableIncrement +=
             (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
-                * insertRowPlan.getDataTypes()[i].getDataTypeSize();
+                * dataTypes[i].getDataTypeSize();
       }
       // TEXT data mem size
-      if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) {
-        textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
+      if (dataTypes[i] == TSDataType.TEXT) {
+        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
     return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
   }
 
-  private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
-      throws WriteProcessException {
-    if (start >= end) {
-      return new long[] {0, 0, 0};
-    }
-    long[] memIncrements = new long[3]; // memTable, text, chunk metadata
-
-    // get device id
-    IDeviceID deviceID = null;
-    try {
-      deviceID = getDeviceID(insertTabletPlan.getDevicePath().getFullPath());
-    } catch (IllegalPathException e) {
-      throw new WriteProcessException(e);
-    }
-
-    for (int i = 0; i < insertTabletPlan.getDataTypes().length; i++) {
-      // skip failed Measurements
-      TSDataType dataType = insertTabletPlan.getDataTypes()[i];
-      String measurement = insertTabletPlan.getMeasurements()[i];
-      Object column = insertTabletPlan.getColumns()[i];
-      if (dataType == null || column == null || measurement == null) {
-        continue;
-      }
-      updateMemCost(dataType, measurement, deviceID, start, end, memIncrements, column);
-    }
-    long memTableIncrement = memIncrements[0];
-    long textDataIncrement = memIncrements[1];
-    long chunkMetadataIncrement = memIncrements[2];
-    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
-    return memIncrements;
-  }
-
-  private long[] checkMemCostAndAddToTspInfo(InsertTabletNode insertTabletNode, int start, int end)
+  private long[] checkMemCostAndAddToTspInfo(
+      String deviceId,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      Object[] columns,
+      int start,
+      int end)
       throws WriteProcessException {
     if (start >= end) {
       return new long[] {0, 0, 0};
@@ -619,53 +631,20 @@ public class TsFileProcessor {
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
     // get device id
-    IDeviceID deviceID = null;
+    IDeviceID deviceID;
     try {
-      deviceID = getDeviceID(insertTabletNode.getDevicePath().getFullPath());
+      deviceID = getDeviceID(deviceId);
     } catch (IllegalPathException e) {
       throw new WriteProcessException(e);
     }
 
-    for (int i = 0; i < insertTabletNode.getDataTypes().length; i++) {
+    for (int i = 0; i < dataTypes.length; i++) {
       // skip failed Measurements
-      TSDataType dataType = insertTabletNode.getDataTypes()[i];
-      String measurement = insertTabletNode.getMeasurementSchemas()[i].getMeasurementId();
-      Object column = insertTabletNode.getColumns()[i];
-      if (dataType == null || column == null || measurement == null) {
+      if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) {
         continue;
       }
-      updateMemCost(dataType, measurement, deviceID, start, end, memIncrements, column);
-    }
-    long memTableIncrement = memIncrements[0];
-    long textDataIncrement = memIncrements[1];
-    long chunkMetadataIncrement = memIncrements[2];
-    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
-    return memIncrements;
-  }
-
-  private long[] checkAlignedMemCostAndAddToTsp(
-      InsertTabletPlan insertTabletPlan, int start, int end) throws WriteProcessException {
-    if (start >= end) {
-      return new long[] {0, 0, 0};
+      updateMemCost(dataTypes[i], measurements[i], deviceID, start, end, memIncrements, columns[i]);
     }
-    long[] memIncrements = new long[3]; // memTable, text, chunk metadata
-
-    // get device id
-    IDeviceID deviceID = null;
-    try {
-      deviceID = getDeviceID(insertTabletPlan.getDevicePath().getFullPath());
-    } catch (IllegalPathException e) {
-      throw new WriteProcessException(e);
-    }
-
-    updateAlignedMemCost(
-        insertTabletPlan.getDataTypes(),
-        deviceID,
-        insertTabletPlan.getMeasurements(),
-        start,
-        end,
-        memIncrements,
-        insertTabletPlan.getColumns());
     long memTableIncrement = memIncrements[0];
     long textDataIncrement = memIncrements[1];
     long chunkMetadataIncrement = memIncrements[2];
@@ -674,28 +653,27 @@ public class TsFileProcessor {
   }
 
   private long[] checkAlignedMemCostAndAddToTsp(
-      InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException {
+      String deviceId,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      Object[] columns,
+      int start,
+      int end)
+      throws WriteProcessException {
     if (start >= end) {
       return new long[] {0, 0, 0};
     }
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
     // get device id
-    IDeviceID deviceID = null;
+    IDeviceID deviceID;
     try {
-      deviceID = getDeviceID(insertTabletNode.getDevicePath().getFullPath());
+      deviceID = getDeviceID(deviceId);
     } catch (IllegalPathException e) {
       throw new WriteProcessException(e);
     }
 
-    updateAlignedMemCost(
-        insertTabletNode.getDataTypes(),
-        deviceID,
-        insertTabletNode.getMeasurements(),
-        start,
-        end,
-        memIncrements,
-        insertTabletNode.getColumns());
+    updateAlignedMemCost(dataTypes, deviceID, measurements, start, end, memIncrements, columns);
     long memTableIncrement = memIncrements[0];
     long textDataIncrement = memIncrements[1];
     long chunkMetadataIncrement = memIncrements[2];
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
new file mode 100644
index 0000000000..a49e05acbd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
+import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataRegionTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(DataRegionTest.class);
+
+  private String storageGroup = "root.vehicle.d0";
+  private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+  private String deviceId = "root.vehicle.d0";
+  private String measurementId = "s0";
+  private DataRegion dataRegion;
+  private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
+
+  @Before
+  public void setUp() throws Exception {
+    MetadataManagerHelper.initMetadata();
+    EnvironmentUtils.envSetUp();
+    dataRegion = new DummyDataRegion(systemDir, storageGroup);
+    CompactionTaskManager.getInstance().start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    dataRegion.syncDeleteDataFiles();
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+    CompactionTaskManager.getInstance().stop();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  public static InsertRowNode buildInsertRowNodeByTSRecord(TSRecord record)
+      throws IllegalPathException {
+    String[] measurements = new String[record.dataPointList.size()];
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[record.dataPointList.size()];
+    TSDataType[] dataTypes = new TSDataType[record.dataPointList.size()];
+    Object[] values = new Object[record.dataPointList.size()];
+    for (int i = 0; i < record.dataPointList.size(); i++) {
+      measurements[i] = record.dataPointList.get(i).getMeasurementId();
+      measurementSchemas[i] =
+          new MeasurementSchema(
+              measurements[i],
+              record.dataPointList.get(i).getType(),
+              TSEncoding.PLAIN,
+              CompressionType.UNCOMPRESSED);
+      dataTypes[i] = record.dataPointList.get(i).getType();
+      values[i] = record.dataPointList.get(i).getValue();
+    }
+    QueryId queryId = new QueryId("test_write");
+    return new InsertRowNode(
+        queryId.genPlanNodeId(),
+        new PartialPath(record.deviceId),
+        false,
+        measurementSchemas,
+        dataTypes,
+        record.time,
+        values);
+  }
+
+  @Test
+  public void testUnseqUnsealedDelete()
+      throws WriteProcessException, IOException, MetadataException, TriggerExecutionException {
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 1; j <= 10; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+
+    for (int j = 11; j <= 20; j++) {
+      record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+
+    PartialPath fullPath =
+        new MeasurementPath(
+            deviceId,
+            measurementId,
+            new MeasurementSchema(
+                measurementId,
+                TSDataType.INT32,
+                TSEncoding.RLE,
+                CompressionType.UNCOMPRESSED,
+                Collections.emptyMap()));
+
+    dataRegion.delete(new PartialPath(deviceId, measurementId), 0, 15L, -1, null);
+
+    List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.query(
+          Collections.singletonList(fullPath),
+          EnvironmentUtils.TEST_QUERY_CONTEXT,
+          tsfileResourcesForQuery);
+    }
+
+    Assert.assertEquals(1, tsfileResourcesForQuery.size());
+    List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
+    long time = 16;
+    for (ReadOnlyMemChunk memChunk : memChunks) {
+      IPointReader iterator = memChunk.getPointReader();
+      while (iterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+        Assert.assertEquals(time++, timeValuePair.getTimestamp());
+      }
+    }
+  }
+
+  @Test
+  public void testSequenceSyncClose()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    for (int j = 1; j <= 10; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
+  @Test
+  public void testInsertDataAndRemovePartitionAndInsert()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    for (int j = 0; j < 10; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    dataRegion.removePartitions((storageGroupName, timePartitionId) -> true);
+
+    for (int j = 0; j < 10; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+  }
+
+  @Test
+  public void testIoTDBTabletWriteAndSyncClose()
+      throws QueryProcessException, IllegalPathException, TriggerExecutionException {
+    String[] measurements = new String[2];
+    measurements[0] = "s0";
+    measurements[1] = "s1";
+    TSDataType[] dataTypes = new TSDataType[2];
+    dataTypes[0] = TSDataType.INT32;
+    dataTypes[1] = TSDataType.INT64;
+
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+    measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+    measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+    long[] times = new long[100];
+    Object[] columns = new Object[2];
+    columns[0] = new int[100];
+    columns[1] = new long[100];
+
+    for (int r = 0; r < 100; r++) {
+      times[r] = r;
+      ((int[]) columns[0])[r] = 1;
+      ((long[]) columns[1])[r] = 1;
+    }
+
+    InsertTabletNode insertTabletNode1 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    for (int r = 50; r < 149; r++) {
+      times[r - 50] = r;
+      ((int[]) columns[0])[r - 50] = 1;
+      ((long[]) columns[1])[r - 50] = 1;
+    }
+
+    InsertTabletNode insertTabletNode2 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+
+    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
+  @Test
+  public void testSeqAndUnSeqSyncClose()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    for (int j = 21; j <= 30; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 10; j >= 1; j--) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
+  @Test
+  public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
+      throws WriteProcessException, QueryProcessException, IllegalPathException, IOException,
+          TriggerExecutionException {
+    boolean defaultValue = config.isEnableDiscardOutOfOrderData();
+    config.setEnableDiscardOutOfOrderData(true);
+
+    for (int j = 21; j <= 30; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 10; j >= 1; j--) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+
+    config.setEnableDiscardOutOfOrderData(defaultValue);
+  }
+
+  @Test
+  public void testEnableDiscardOutOfOrderDataForInsertTablet1()
+      throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    long defaultTimePartition = config.getPartitionInterval();
+    boolean defaultEnablePartition = config.isEnablePartition();
+    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnablePartition(true);
+    config.setPartitionInterval(100);
+
+    String[] measurements = new String[2];
+    measurements[0] = "s0";
+    measurements[1] = "s1";
+    TSDataType[] dataTypes = new TSDataType[2];
+    dataTypes[0] = TSDataType.INT32;
+    dataTypes[1] = TSDataType.INT64;
+
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+    measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+    measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+    long[] times = new long[100];
+    Object[] columns = new Object[2];
+    columns[0] = new int[100];
+    columns[1] = new long[100];
+
+    for (int r = 0; r < 100; r++) {
+      times[r] = r;
+      ((int[]) columns[0])[r] = 1;
+      ((long[]) columns[1])[r] = 1;
+    }
+    InsertTabletNode insertTabletNode1 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    for (int r = 149; r >= 50; r--) {
+      times[r - 50] = r;
+      ((int[]) columns[0])[r - 50] = 1;
+      ((long[]) columns[1])[r - 50] = 1;
+    }
+    InsertTabletNode insertTabletNode2 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+
+    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+
+    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setPartitionInterval(defaultTimePartition);
+    config.setEnablePartition(defaultEnablePartition);
+  }
+
+  @Test
+  public void testEnableDiscardOutOfOrderDataForInsertTablet2()
+      throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    long defaultTimePartition = config.getPartitionInterval();
+    boolean defaultEnablePartition = config.isEnablePartition();
+    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnablePartition(true);
+    config.setPartitionInterval(1200);
+
+    String[] measurements = new String[2];
+    measurements[0] = "s0";
+    measurements[1] = "s1";
+    TSDataType[] dataTypes = new TSDataType[2];
+    dataTypes[0] = TSDataType.INT32;
+    dataTypes[1] = TSDataType.INT64;
+
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+    measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+    measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+    long[] times = new long[1200];
+    Object[] columns = new Object[2];
+    columns[0] = new int[1200];
+    columns[1] = new long[1200];
+
+    for (int r = 0; r < 1200; r++) {
+      times[r] = r;
+      ((int[]) columns[0])[r] = 1;
+      ((long[]) columns[1])[r] = 1;
+    }
+    InsertTabletNode insertTabletNode1 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    for (int r = 1249; r >= 50; r--) {
+      times[r - 50] = r;
+      ((int[]) columns[0])[r - 50] = 1;
+      ((long[]) columns[1])[r - 50] = 1;
+    }
+    InsertTabletNode insertTabletNode2 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+
+    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+
+    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setPartitionInterval(defaultTimePartition);
+    config.setEnablePartition(defaultEnablePartition);
+  }
+
+  @Test
+  public void testEnableDiscardOutOfOrderDataForInsertTablet3()
+      throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    long defaultTimePartition = config.getPartitionInterval();
+    boolean defaultEnablePartition = config.isEnablePartition();
+    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnablePartition(true);
+    config.setPartitionInterval(1000);
+
+    String[] measurements = new String[2];
+    measurements[0] = "s0";
+    measurements[1] = "s1";
+    TSDataType[] dataTypes = new TSDataType[2];
+    dataTypes[0] = TSDataType.INT32;
+    dataTypes[1] = TSDataType.INT64;
+
+    MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+    measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+    measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+    long[] times = new long[1200];
+    Object[] columns = new Object[2];
+    columns[0] = new int[1200];
+    columns[1] = new long[1200];
+
+    for (int r = 0; r < 1200; r++) {
+      times[r] = r;
+      ((int[]) columns[0])[r] = 1;
+      ((long[]) columns[1])[r] = 1;
+    }
+    InsertTabletNode insertTabletNode1 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    for (int r = 1249; r >= 50; r--) {
+      times[r - 50] = r;
+      ((int[]) columns[0])[r - 50] = 1;
+      ((long[]) columns[1])[r - 50] = 1;
+    }
+    InsertTabletNode insertTabletNode2 =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath("root.vehicle.d0"),
+            false,
+            measurementSchemas,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    dataRegion.insertTablet(insertTabletNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+
+    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+
+    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setPartitionInterval(defaultTimePartition);
+    config.setEnablePartition(defaultEnablePartition);
+  }
+
+  @Test
+  public void testMerge()
+      throws WriteProcessException, QueryProcessException, IllegalPathException,
+          TriggerExecutionException {
+    int originCandidateFileNum =
+        IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+    IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(9);
+    boolean originEnableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    boolean originEnableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
+    for (int j = 21; j <= 30; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 10; j >= 1; j--) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+      dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    }
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    dataRegion.compact();
+    long totalWaitingTime = 0;
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
+      // wait
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      totalWaitingTime += 100;
+      if (totalWaitingTime % 1000 == 0) {
+        logger.warn("has waited for {} seconds", totalWaitingTime / 1000);
+      }
+      if (totalWaitingTime > 120_000) {
+        Assert.fail();
+        break;
+      }
+    }
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
+            context,
+            null,
+            null);
+    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setMaxInnerCompactionCandidateFileNum(originCandidateFileNum);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableSeqSpaceCompaction(originEnableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(originEnableUnseqSpaceCompaction);
+  }
+
+  @Test
+  public void testDeleteStorageGroupWhenCompacting() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(10);
+    try {
+      for (int j = 0; j < 10; j++) {
+        TSRecord record = new TSRecord(j, deviceId);
+        record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+        dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+        dataRegion.asyncCloseAllWorkingTsFileProcessors();
+      }
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+      InnerSpaceCompactionTask task =
+          new InnerSpaceCompactionTask(
+              0,
+              dataRegion.getTsFileManager(),
+              dataRegion.getSequenceFileList(),
+              true,
+              new ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()),
+              new AtomicInteger(0));
+      CompactionTaskManager.getInstance().submitTask(task);
+      Thread.sleep(20);
+      StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
+      Thread.sleep(500);
+
+      for (TsFileResource resource : dataRegion.getSequenceFileList()) {
+        Assert.assertFalse(resource.getTsFile().exists());
+      }
+      TsFileResource targetTsFileResource =
+          TsFileNameGenerator.getInnerCompactionTargetFileResource(
+              dataRegion.getSequenceFileList(), true);
+      Assert.assertFalse(targetTsFileResource.getTsFile().exists());
+      String dataDirectory = targetTsFileResource.getTsFile().getParent();
+      File logFile =
+          new File(
+              dataDirectory
+                  + File.separator
+                  + targetTsFileResource.getTsFile().getName()
+                  + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
+      Assert.assertFalse(logFile.exists());
+      Assert.assertFalse(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+      Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
+    } finally {
+      new CompactionConfigRestorer().restoreCompactionConfig();
+    }
+  }
+
+  @Test
+  public void testTimedFlushSeqMemTable()
+      throws IllegalPathException, InterruptedException, WriteProcessException,
+          TriggerExecutionException, ShutdownException {
+    // create one sequence memtable
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // change config & reboot timed service
+    boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+    long preFLushInterval = config.getSeqMemtableFlushInterval();
+    config.setEnableTimedFlushSeqMemtable(true);
+    config.setSeqMemtableFlushInterval(5);
+    StorageEngine.getInstance().rebootTimedService();
+
+    Thread.sleep(500);
+
+    Assert.assertEquals(1, dataRegion.getWorkSequenceTsFileProcessors().size());
+    TsFileProcessor tsFileProcessor =
+        dataRegion.getWorkSequenceTsFileProcessors().iterator().next();
+    FlushManager flushManager = FlushManager.getInstance();
+
+    // flush the sequence memtable
+    dataRegion.timedFlushSeqMemTable();
+
+    // wait until memtable flush task is done
+    int waitCnt = 0;
+    while (tsFileProcessor.getFlushingMemTableSize() != 0
+        || tsFileProcessor.isManagedByFlushManager()
+        || flushManager.getNumberOfPendingTasks() != 0
+        || flushManager.getNumberOfPendingSubTasks() != 0
+        || flushManager.getNumberOfWorkingTasks() != 0
+        || flushManager.getNumberOfWorkingSubTasks() != 0) {
+      Thread.sleep(500);
+      ++waitCnt;
+      if (waitCnt % 10 == 0) {
+        logger.info("already wait {} s", waitCnt / 2);
+      }
+    }
+
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+    config.setSeqMemtableFlushInterval(preFLushInterval);
+  }
+
+  @Test
+  public void testTimedFlushUnseqMemTable()
+      throws IllegalPathException, InterruptedException, WriteProcessException,
+          TriggerExecutionException, ShutdownException {
+    // create one sequence memtable & close
+    TSRecord record = new TSRecord(10000, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // create one unsequence memtable
+    record = new TSRecord(1, deviceId);
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+    dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    // change config & reboot timed service
+    boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
+    long preFLushInterval = config.getUnseqMemtableFlushInterval();
+    config.setEnableTimedFlushUnseqMemtable(true);
+    config.setUnseqMemtableFlushInterval(5);
+    StorageEngine.getInstance().rebootTimedService();
+
+    Thread.sleep(500);
+
+    Assert.assertEquals(1, dataRegion.getWorkUnsequenceTsFileProcessors().size());
+    TsFileProcessor tsFileProcessor =
+        dataRegion.getWorkUnsequenceTsFileProcessors().iterator().next();
+    FlushManager flushManager = FlushManager.getInstance();
+
+    // flush the unsequence memtable
+    dataRegion.timedFlushUnseqMemTable();
+
+    // wait until memtable flush task is done
+    int waitCnt = 0;
+    while (tsFileProcessor.getFlushingMemTableSize() != 0
+        || tsFileProcessor.isManagedByFlushManager()
+        || flushManager.getNumberOfPendingTasks() != 0
+        || flushManager.getNumberOfPendingSubTasks() != 0
+        || flushManager.getNumberOfWorkingTasks() != 0
+        || flushManager.getNumberOfWorkingSubTasks() != 0) {
+      Thread.sleep(500);
+      ++waitCnt;
+      if (waitCnt % 10 == 0) {
+        logger.info("already wait {} s", waitCnt / 2);
+      }
+    }
+
+    Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+    config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
+    config.setUnseqMemtableFlushInterval(preFLushInterval);
+  }
+
+  static class DummyDataRegion extends DataRegion {
+
+    DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
+      super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index ef769fb736..df6b02d4e8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -311,6 +311,15 @@ public class TsFileProcessorTest {
     Assert.assertEquals(828424, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(720360, memTable.memSize());
+    // Test records
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+      processor.insert(new InsertRowPlan(record));
+    }
+    Assert.assertEquals(830120, memTable.getTVListsRamCost());
+    Assert.assertEquals(90100, memTable.getTotalPointsNum());
+    Assert.assertEquals(721560, memTable.memSize());
   }
 
   @Test
@@ -337,6 +346,14 @@ public class TsFileProcessorTest {
     Assert.assertEquals(1656000, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(1440000, memTable.memSize());
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+      processor.insert(new InsertRowPlan(record));
+    }
+    Assert.assertEquals(1657696, memTable.getTVListsRamCost());
+    Assert.assertEquals(90100, memTable.getTotalPointsNum());
+    Assert.assertEquals(1441200, memTable.memSize());
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
similarity index 84%
copy from server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index ef769fb736..f97bf51030 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -24,16 +24,15 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -45,7 +44,6 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
@@ -64,30 +62,35 @@ import java.util.List;
 import java.util.Map;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.engine.storagegroup.DataRegionTest.buildInsertRowNodeByTSRecord;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-public class TsFileProcessorTest {
+public class TsFileProcessorV2Test {
 
   private TsFileProcessor processor;
-  private String storageGroup = "root.vehicle";
-  private StorageGroupInfo sgInfo = new StorageGroupInfo(null);
-  private String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
-  private String deviceId = "root.vehicle.d0";
-  private String measurementId = "s0";
-  private TSDataType dataType = TSDataType.INT32;
-  private TSEncoding encoding = TSEncoding.RLE;
-  private Map<String, String> props = Collections.emptyMap();
+  private final String storageGroup = "root.vehicle";
+  private StorageGroupInfo sgInfo;
+  private final String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
+  private final String deviceId = "root.vehicle.d0";
+  private final String measurementId = "s0";
+  private final TSDataType dataType = TSDataType.INT32;
+  private final TSEncoding encoding = TSEncoding.RLE;
+  private final Map<String, String> props = Collections.emptyMap();
   private QueryContext context;
-  private static Logger logger = LoggerFactory.getLogger(TsFileProcessorTest.class);
+  private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+  private static final Logger logger = LoggerFactory.getLogger(TsFileProcessorV2Test.class);
+
+  public TsFileProcessorV2Test() {}
 
   @Before
-  public void setUp() {
+  public void setUp() throws DataRegionException {
     File file = new File(filePath);
     if (!file.getParentFile().exists()) {
       Assert.assertTrue(file.getParentFile().mkdirs());
     }
     EnvironmentUtils.envSetUp();
+    sgInfo = new StorageGroupInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
     MetadataManagerHelper.initMetadata();
     context = EnvironmentUtils.TEST_QUERY_CONTEXT;
   }
@@ -127,7 +130,7 @@ public class TsFileProcessorTest {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertRowPlan(record));
+      processor.insert(buildInsertRowNodeByTSRecord(record));
     }
 
     // query data in memory
@@ -186,7 +189,7 @@ public class TsFileProcessorTest {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertRowPlan(record));
+      processor.insert(buildInsertRowNodeByTSRecord(record));
     }
 
     // query data in memory
@@ -274,7 +277,7 @@ public class TsFileProcessorTest {
       for (int i = 1; i <= 10; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-        processor.insert(new InsertRowPlan(record));
+        processor.insert(buildInsertRowNodeByTSRecord(record));
       }
       processor.asyncFlush();
     }
@@ -302,15 +305,25 @@ public class TsFileProcessorTest {
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
-    processor.insertTablet(genInsertTablePlan(0, true), 0, 10, new TSStatus[10]);
+    // Test Tablet
+    processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10]);
     IMemTable memTable = processor.getWorkMemTable();
     Assert.assertEquals(828424, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTablePlan(100, true), 0, 10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(100, true), 0, 10, new TSStatus[10]);
     Assert.assertEquals(828424, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTablePlan(200, true), 0, 10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(200, true), 0, 10, new TSStatus[10]);
     Assert.assertEquals(828424, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(720360, memTable.memSize());
+    // Test records
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+      processor.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    Assert.assertEquals(830120, memTable.getTVListsRamCost());
+    Assert.assertEquals(90100, memTable.getTotalPointsNum());
+    Assert.assertEquals(721560, memTable.memSize());
   }
 
   @Test
@@ -328,15 +341,25 @@ public class TsFileProcessorTest {
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
-    processor.insertTablet(genInsertTablePlan(0, false), 0, 10, new TSStatus[10]);
+    // Test tablet
+    processor.insertTablet(genInsertTableNode(0, false), 0, 10, new TSStatus[10]);
     IMemTable memTable = processor.getWorkMemTable();
     Assert.assertEquals(1656000, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTablePlan(100, false), 0, 10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(100, false), 0, 10, new TSStatus[10]);
     Assert.assertEquals(1656000, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTablePlan(200, false), 0, 10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(200, false), 0, 10, new TSStatus[10]);
     Assert.assertEquals(1656000, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(1440000, memTable.memSize());
+    // Test records
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+      processor.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    Assert.assertEquals(1657696, memTable.getTVListsRamCost());
+    Assert.assertEquals(90100, memTable.getTotalPointsNum());
+    Assert.assertEquals(1441200, memTable.memSize());
   }
 
   @Test
@@ -369,7 +392,7 @@ public class TsFileProcessorTest {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertRowPlan(record));
+      processor.insert(buildInsertRowNodeByTSRecord(record));
     }
 
     // query data in memory
@@ -414,25 +437,19 @@ public class TsFileProcessorTest {
     }
   }
 
-  private InsertTabletPlan genInsertTablePlan(long startTime, boolean isAligned)
+  private InsertTabletNode genInsertTableNode(long startTime, boolean isAligned)
       throws IllegalPathException {
     String deviceId = "root.sg.device5";
     String[] measurements = new String[3000];
-    List<Integer> dataTypesList = new ArrayList<>();
     TSDataType[] dataTypes = new TSDataType[3000];
     TSEncoding[] encodings = new TSEncoding[3000];
-    IMeasurementMNode[] mNodes = new IMeasurementMNode[3000];
+    MeasurementSchema[] schemas = new MeasurementSchema[3000];
     for (int i = 0; i < 3000; i++) {
       measurements[i] = "s" + i;
-      dataTypesList.add(TSDataType.INT64.ordinal());
       dataTypes[i] = TSDataType.INT64;
       encodings[i] = TSEncoding.PLAIN;
-      IMeasurementSchema schema =
-          new MeasurementSchema(measurements[i], dataTypes[i], encodings[i]);
-      mNodes[i] = MeasurementMNode.getMeasurementMNode(null, measurements[i], schema, null);
+      schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i], encodings[i]);
     }
-    InsertTabletPlan insertTabletPlan =
-        new InsertTabletPlan(new PartialPath(deviceId), measurements, dataTypesList);
 
     long[] times = new long[10];
     Object[] columns = new Object[3000];
@@ -446,12 +463,15 @@ public class TsFileProcessorTest {
         ((long[]) columns[i])[(int) r] = r;
       }
     }
-    insertTabletPlan.setTimes(times);
-    insertTabletPlan.setColumns(columns);
-    insertTabletPlan.setRowCount(times.length);
-    insertTabletPlan.setMeasurementMNodes(mNodes);
-    insertTabletPlan.setAligned(isAligned);
-
-    return insertTabletPlan;
+    return new InsertTabletNode(
+        new QueryId("test_write").genPlanNodeId(),
+        new PartialPath(deviceId),
+        isAligned,
+        schemas,
+        dataTypes,
+        times,
+        null,
+        columns,
+        times.length);
   }
 }