You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/09 09:47:38 UTC

[iotdb] branch master updated: [IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 99c824f4d0 [IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)
99c824f4d0 is described below

commit 99c824f4d0d370c73eb97635c256e4ed990a7df0
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Sat Apr 9 17:47:33 2022 +0800

    [IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)
---
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../resources/conf/iotdb-engine.properties         |   6 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   7 +
 .../engine/compaction/CompactionTaskManager.java   |  18 ++
 .../db/engine/compaction/CompactionUtils.java      | 120 +++++---
 .../cross/rewrite/task/SubCompactionTask.java      | 107 +++++++
 .../writer/AbstractCompactionWriter.java           |  64 ++--
 .../writer/CrossSpaceCompactionWriter.java         |  80 ++---
 .../writer/InnerSpaceCompactionWriter.java         |  27 +-
 .../db/engine/compaction/CompactionUtilsTest.java  | 321 ++++++++++++++++++++-
 .../compaction/cross/CrossSpaceCompactionTest.java |   3 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   8 +-
 13 files changed, 642 insertions(+), 134 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f6ea467f74..4df1111ea7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -37,6 +37,7 @@ public enum ThreadName {
   FLUSH_SERVICE("Flush"),
   FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
   COMPACTION_SERVICE("Compaction"),
+  COMPACTION_SUB_SERVICE("Sub-Compaction"),
   COMPACTION_SCHEDULE("Compaction_Schedule"),
   WAL_DAEMON("WAL-Sync"),
   WAL_FORCE_DAEMON("WAL-Force"),
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3d19ffeab5..a16b373130 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -525,6 +525,12 @@ timestamp_precision=ms
 # Datatype: int
 # query_timeout_threshold=60000
 
+# The number of sub compaction threads to be set up to perform compaction.
+# Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
+# Set to 1 when less than or equal to 0.
+# Datatype: int
+# sub_compaction_thread_num=4
+
 ####################
 ### Metadata Cache Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b4bde3f2b8..54464410d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -442,6 +442,12 @@ public class IoTDBConfig {
   /** The interval of compaction task submission from queue in CompactionTaskMananger */
   private long compactionSubmissionIntervalInMs = 60_000L;
 
+  /**
+   * The number of sub compaction threads to be set up to perform compaction. Currently only works
+   * for nonAligned data in cross space compaction and unseq inner space compaction.
+   */
+  private int subCompactionTaskNum = 4;
+
   /** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
   private boolean metaDataCacheEnable = true;
 
@@ -2552,6 +2558,14 @@ public class IoTDBConfig {
     compactionSubmissionIntervalInMs = interval;
   }
 
+  public int getSubCompactionTaskNum() {
+    return subCompactionTaskNum;
+  }
+
+  public void setSubCompactionTaskNum(int subCompactionTaskNum) {
+    this.subCompactionTaskNum = subCompactionTaskNum;
+  }
+
   public String getDeviceIDTransformationMethod() {
     return deviceIDTransformationMethod;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6a88bd8333..ec955aa40a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -380,6 +380,13 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "compaction_priority", conf.getCompactionPriority().toString())));
 
+      int subtaskNum =
+          Integer.parseInt(
+              properties.getProperty(
+                  "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
+      subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
+      conf.setSubCompactionTaskNum(subtaskNum);
+
       conf.setQueryTimeoutThreshold(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 7d7c9ecb4b..bf54c18b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -55,6 +56,10 @@ public class CompactionTaskManager implements IService {
   // The thread pool that executes the compaction task. The default number of threads for this pool
   // is 10.
   private WrappedScheduledExecutorService taskExecutionPool;
+
+  // The thread pool that executes the sub compaction task.
+  private ScheduledExecutorService subCompactionTaskExecutionPool;
+
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
   private FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
       new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
@@ -86,6 +91,11 @@ public class CompactionTaskManager implements IService {
               IoTDBThreadPoolFactory.newScheduledThreadPool(
                   IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
                   ThreadName.COMPACTION_SERVICE.getName());
+      this.subCompactionTaskExecutionPool =
+          IoTDBThreadPoolFactory.newScheduledThreadPool(
+              IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                  * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
+              ThreadName.COMPACTION_SUB_SERVICE.getName());
       currentTaskNum = new AtomicInteger(0);
       compactionTaskSubmissionThreadPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName());
@@ -310,6 +320,14 @@ public class CompactionTaskManager implements IService {
             : "taskExecutionPool is terminated");
   }
 
+  public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
+    if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
+      Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
+      return future;
+    }
+    return null;
+  }
+
   /**
    * Abort all compactions of a storage group. The running compaction tasks will be returned as a
    * list, the compaction threads for the storage group are not terminated util all the tasks in the
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 6e9cf68291..a4328449ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask;
 import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
@@ -44,12 +45,14 @@ import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,22 +60,26 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /**
  * This tool can be used to perform inner space or cross space compaction of aligned and non aligned
  * timeseries . Currently, we use {@link
  * org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils} to speed up if it is
- * an inner space compaction.
+ * an seq inner space compaction.
  */
 public class CompactionUtils {
   private static final Logger logger =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final int subTaskNum =
+      IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
   public static void compact(
       List<TsFileResource> seqFileResources,
@@ -108,6 +115,7 @@ public class CompactionUtils {
       }
 
       compactionWriter.endFile();
+      updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter);
       updatePlanIndexes(targetFileResources, seqFileResources, unseqFileResources);
     } finally {
       QueryResourceManager.getInstance().endQuery(queryId);
@@ -157,9 +165,9 @@ public class CompactionUtils {
     if (dataBatchReader.hasNextBatch()) {
       // chunkgroup is serialized only when at least one timeseries under this device has data
       compactionWriter.startChunkGroup(device, true);
-      compactionWriter.startMeasurement(measurementSchemas);
-      writeWithReader(compactionWriter, dataBatchReader);
-      compactionWriter.endMeasurement();
+      compactionWriter.startMeasurement(measurementSchemas, 0);
+      writeWithReader(compactionWriter, dataBatchReader, 0);
+      compactionWriter.endMeasurement(0);
       compactionWriter.endChunkGroup();
     }
   }
@@ -170,59 +178,58 @@ public class CompactionUtils {
       AbstractCompactionWriter compactionWriter,
       QueryContext queryContext,
       QueryDataSource queryDataSource)
-      throws MetadataException, IOException {
-    boolean hasStartChunkGroup = false;
+      throws IOException, InterruptedException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
         deviceIterator.iterateNotAlignedSeries(device, false);
     Set<String> allMeasurements = measurementIterator.getAllMeasurements();
+    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+
+    // assign all measurements to different sub tasks
+    Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
+    int idx = 0;
     for (String measurement : allMeasurements) {
-      List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-      try {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-          measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
-        } else {
-          measurementSchemas.add(
-              IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement)));
-        }
-      } catch (PathNotExistException e) {
-        logger.info("A deleted path is skipped: {}", e.getMessage());
-        continue;
+      if (measurementsForEachSubTask[idx % subTaskNums] == null) {
+        measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
       }
+      measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
+    }
 
-      IBatchReader dataBatchReader =
-          constructReader(
-              device,
-              Collections.singletonList(measurement),
-              measurementSchemas,
-              allMeasurements,
-              queryContext,
-              queryDataSource,
-              false);
+    // construct sub tasks and start compacting measurements in parallel
+    List<Future<Void>> futures = new ArrayList<>();
+    compactionWriter.startChunkGroup(device, false);
+    for (int i = 0; i < subTaskNums; i++) {
+      futures.add(
+          CompactionTaskManager.getInstance()
+              .submitSubTask(
+                  new SubCompactionTask(
+                      device,
+                      measurementsForEachSubTask[i],
+                      queryContext,
+                      queryDataSource,
+                      compactionWriter,
+                      i)));
+    }
 
-      if (dataBatchReader.hasNextBatch()) {
-        if (!hasStartChunkGroup) {
-          // chunkgroup is serialized only when at least one timeseries under this device has
-          // data
-          compactionWriter.startChunkGroup(device, false);
-          hasStartChunkGroup = true;
-        }
-        compactionWriter.startMeasurement(measurementSchemas);
-        writeWithReader(compactionWriter, dataBatchReader);
-        compactionWriter.endMeasurement();
+    // wait for all sub tasks finish
+    for (int i = 0; i < subTaskNums; i++) {
+      try {
+        futures.get(i).get();
+      } catch (InterruptedException | ExecutionException e) {
+        logger.error("SubCompactionTask meet errors ", e);
+        Thread.interrupted();
+        throw new InterruptedException();
       }
     }
 
-    if (hasStartChunkGroup) {
-      compactionWriter.endChunkGroup();
-    }
+    compactionWriter.endChunkGroup();
   }
 
-  private static void writeWithReader(AbstractCompactionWriter writer, IBatchReader reader)
-      throws IOException {
+  public static void writeWithReader(
+      AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException {
     while (reader.hasNextBatch()) {
       BatchData batchData = reader.nextBatch();
       while (batchData.hasCurrent()) {
-        writer.write(batchData.currentTime(), batchData.currentValue());
+        writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId);
         batchData.next();
       }
     }
@@ -232,7 +239,7 @@ public class CompactionUtils {
    * @param measurementIds if device is aligned, then measurementIds contain all measurements. If
    *     device is not aligned, then measurementIds only contain one measurement.
    */
-  private static IBatchReader constructReader(
+  public static IBatchReader constructReader(
       String deviceId,
       List<String> measurementIds,
       List<IMeasurementSchema> measurementSchemas,
@@ -268,6 +275,29 @@ public class CompactionUtils {
     }
   }
 
+  private static void updateDeviceStartTimeAndEndTime(
+      List<TsFileResource> targetResources, AbstractCompactionWriter compactionWriter) {
+    List<TsFileIOWriter> targetFileWriters = compactionWriter.getFileIOWriter();
+    for (int i = 0; i < targetFileWriters.size(); i++) {
+      TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
+      TsFileResource fileResource = targetResources.get(i);
+      // The tmp target file may does not have any data points written due to the existence of the
+      // mods file, and it will be deleted after compaction. So skip the target file that has been
+      // deleted.
+      if (!fileResource.getTsFile().exists()) {
+        continue;
+      }
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry :
+          fileIOWriter.getDeviceTimeseriesMetadataMap().entrySet()) {
+        String device = entry.getKey();
+        for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+          fileResource.updateStartTime(device, timeseriesMetadata.getStatistics().getStartTime());
+          fileResource.updateEndTime(device, timeseriesMetadata.getStatistics().getEndTime());
+        }
+      }
+    }
+  }
+
   private static void updatePlanIndexes(
       List<TsFileResource> targetResources,
       List<TsFileResource> seqResources,
@@ -280,7 +310,7 @@ public class CompactionUtils {
     // in the new file
     for (int i = 0; i < targetResources.size(); i++) {
       TsFileResource targetResource = targetResources.get(i);
-      // remove the target file been deleted from list
+      // remove the target file that has been deleted from list
       if (!targetResource.getTsFile().exists()) {
         targetResources.remove(i--);
         continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
new file mode 100644
index 0000000000..315122e3cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * This class is used to implement reading the measurements and writing to the target files in
+ * parallel in the compaction. Currently, it only works for nonAligned data in cross space
+ * compaction and unseq inner space compaction.
+ */
+public class SubCompactionTask implements Callable<Void> {
+  private static final Logger logger =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final String device;
+  private final Set<String> measurementList;
+  private final QueryContext queryContext;
+  private final QueryDataSource queryDataSource;
+  private final AbstractCompactionWriter compactionWriter;
+  private final int taskId;
+
+  public SubCompactionTask(
+      String device,
+      Set<String> measurementList,
+      QueryContext queryContext,
+      QueryDataSource queryDataSource,
+      AbstractCompactionWriter compactionWriter,
+      int taskId) {
+    this.device = device;
+    this.measurementList = measurementList;
+    this.queryContext = queryContext;
+    this.queryDataSource = queryDataSource;
+    this.compactionWriter = compactionWriter;
+    this.taskId = taskId;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    for (String measurement : measurementList) {
+      List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+      try {
+        if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+          measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
+        } else {
+          measurementSchemas.add(
+              IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement)));
+        }
+      } catch (PathNotExistException e) {
+        logger.info("A deleted path is skipped: {}", e.getMessage());
+        continue;
+      }
+
+      IBatchReader dataBatchReader =
+          CompactionUtils.constructReader(
+              device,
+              Collections.singletonList(measurement),
+              measurementSchemas,
+              measurementList,
+              queryContext,
+              queryDataSource,
+              false);
+
+      if (dataBatchReader.hasNextBatch()) {
+        compactionWriter.startMeasurement(measurementSchemas, taskId);
+        CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, taskId);
+        compactionWriter.endMeasurement(taskId);
+      }
+    }
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 053631a516..5c1460230d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.compaction.CompactionMetricsManager;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -38,8 +37,12 @@ import java.io.IOException;
 import java.util.List;
 
 public abstract class AbstractCompactionWriter implements AutoCloseable {
+  protected static final int subTaskNum =
+      IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
-  protected IChunkWriter chunkWriter;
+  // Each sub task has its own chunk writer.
+  // The index of the array corresponds to subTaskId.
+  protected IChunkWriter[] chunkWriters = new IChunkWriter[subTaskNum];
 
   protected boolean isAlign;
 
@@ -49,25 +52,26 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
   private final boolean enableMetrics =
       MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric();
 
-  // point count in current measurment, which is used to check size
-  private int measurementPointCount;
+  // Each sub task has point count in current measurment, which is used to check size.
+  // The index of the array corresponds to subTaskId.
+  protected int[] measurementPointCountArray = new int[subTaskNum];
 
   public abstract void startChunkGroup(String deviceId, boolean isAlign) throws IOException;
 
   public abstract void endChunkGroup() throws IOException;
 
-  public void startMeasurement(List<IMeasurementSchema> measurementSchemaList) {
-    measurementPointCount = 0;
+  public void startMeasurement(List<IMeasurementSchema> measurementSchemaList, int subTaskId) {
+    measurementPointCountArray[subTaskId] = 0;
     if (isAlign) {
-      chunkWriter = new AlignedChunkWriterImpl(measurementSchemaList);
+      chunkWriters[subTaskId] = new AlignedChunkWriterImpl(measurementSchemaList);
     } else {
-      chunkWriter = new ChunkWriterImpl(measurementSchemaList.get(0), true);
+      chunkWriters[subTaskId] = new ChunkWriterImpl(measurementSchemaList.get(0), true);
     }
   }
 
-  public abstract void endMeasurement() throws IOException;
+  public abstract void endMeasurement(int subTaskId) throws IOException;
 
-  public abstract void write(long timestamp, Object value) throws IOException;
+  public abstract void write(long timestamp, Object value, int subTaskId) throws IOException;
 
   public abstract void write(long[] timestamps, Object values);
 
@@ -75,9 +79,9 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
 
   public abstract void close() throws IOException;
 
-  protected void writeDataPoint(Long timestamp, Object value) {
+  protected void writeDataPoint(Long timestamp, Object value, int subTaskId) {
     if (!isAlign) {
-      ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriter;
+      ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriters[subTaskId];
       switch (chunkWriter.getDataType()) {
         case TEXT:
           chunkWriter.write(timestamp, (Binary) value);
@@ -101,7 +105,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
           throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
       }
     } else {
-      AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriter;
+      AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
       for (TsPrimitiveType val : (TsPrimitiveType[]) value) {
         if (val == null) {
           chunkWriter.write(timestamp, null, true);
@@ -133,28 +137,37 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
       }
       chunkWriter.write(timestamp);
     }
-    measurementPointCount++;
+    measurementPointCountArray[subTaskId] += 1;
   }
 
-  protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter) throws IOException {
-    if (measurementPointCount % 10 == 0 && checkChunkSize()) {
-      writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
+  protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
+      throws IOException {
+    writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
+    synchronized (targetWriter) {
+      chunkWriters[subTaskId].writeToFileWriter(targetWriter);
+    }
+  }
+
+  protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter, int subTaskId)
+      throws IOException {
+    if (measurementPointCountArray[subTaskId] % 10 == 0 && checkChunkSize(subTaskId)) {
+      flushChunkToFileWriter(fileWriter, subTaskId);
       CompactionMetricsManager.recordWriteInfo(
           this instanceof CrossSpaceCompactionWriter
               ? CompactionType.CROSS_COMPACTION
               : CompactionType.INNER_UNSEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
           this.isAlign,
-          chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(fileWriter);
+          chunkWriters[subTaskId].estimateMaxSeriesMemSize());
     }
   }
 
-  private boolean checkChunkSize() {
-    if (chunkWriter instanceof AlignedChunkWriterImpl) {
-      return ((AlignedChunkWriterImpl) chunkWriter).checkIsChunkSizeOverThreshold(targetChunkSize);
+  protected boolean checkChunkSize(int subTaskId) {
+    if (chunkWriters[subTaskId] instanceof AlignedChunkWriterImpl) {
+      return ((AlignedChunkWriterImpl) chunkWriters[subTaskId])
+          .checkIsChunkSizeOverThreshold(targetChunkSize);
     } else {
-      return chunkWriter.estimateMaxSeriesMemSize() > targetChunkSize;
+      return chunkWriters[subTaskId].estimateMaxSeriesMemSize() > targetChunkSize;
     }
   }
 
@@ -163,8 +176,5 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
         CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), bytesLength);
   }
 
-  protected void updateDeviceStartAndEndTime(TsFileResource targetResource, long timestamp) {
-    targetResource.updateStartTime(deviceId, timestamp);
-    targetResource.updateEndTime(deviceId, timestamp);
-  }
+  public abstract List<TsFileIOWriter> getFileIOWriter();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 2ebe22581d..3e245cfc35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.compaction.writer;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
@@ -36,68 +35,73 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
 
-  private int seqFileIndex;
+  // Each sub task has its corresponding seq file index.
+  // The index of the array corresponds to subTaskId.
+  private int[] seqFileIndexArray = new int[subTaskNum];
 
+  // device end time in each source seq file
   private final long[] currentDeviceEndTime;
 
+  // whether each target file is empty or not
   private final boolean[] isEmptyFile;
 
-  private final boolean[] hasTargetFileStartChunkGroup;
+  // whether each target file has device data or not
+  private final boolean[] isDeviceExistedInTargetFiles;
 
-  private final List<TsFileResource> targetTsFileResources;
+  // current chunk group header size
+  private int chunkGroupHeaderSize;
 
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
       throws IOException {
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
-    hasTargetFileStartChunkGroup = new boolean[seqFileResources.size()];
+    isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
     for (int i = 0; i < targetResources.size(); i++) {
-      this.fileWriterList.add(new RestorableTsFileIOWriter(targetResources.get(i).getTsFile()));
+      this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
       isEmptyFile[i] = true;
     }
     this.seqTsFileResources = seqFileResources;
-    this.targetTsFileResources = targetResources;
-    seqFileIndex = 0;
   }
 
   @Override
   public void startChunkGroup(String deviceId, boolean isAlign) throws IOException {
     this.deviceId = deviceId;
     this.isAlign = isAlign;
-    this.seqFileIndex = 0;
+    this.seqFileIndexArray = new int[subTaskNum];
     checkIsDeviceExistAndGetDeviceEndTime();
-    for (int i = 0; i < seqTsFileResources.size(); i++) {
-      hasTargetFileStartChunkGroup[i] = false;
+    for (int i = 0; i < fileWriterList.size(); i++) {
+      chunkGroupHeaderSize = fileWriterList.get(i).startChunkGroup(deviceId);
     }
   }
 
   @Override
   public void endChunkGroup() throws IOException {
     for (int i = 0; i < seqTsFileResources.size(); i++) {
-      if (hasTargetFileStartChunkGroup[i]) {
-        fileWriterList.get(i).endChunkGroup();
+      TsFileIOWriter targetFileWriter = fileWriterList.get(i);
+      if (isDeviceExistedInTargetFiles[i]) {
+        targetFileWriter.endChunkGroup();
+      } else {
+        targetFileWriter.truncate(targetFileWriter.getPos() - chunkGroupHeaderSize);
       }
+      isDeviceExistedInTargetFiles[i] = false;
     }
     deviceId = null;
   }
 
   @Override
-  public void endMeasurement() throws IOException {
-    writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
-    chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex));
-    chunkWriter = null;
-    seqFileIndex = 0;
+  public void endMeasurement(int subTaskId) throws IOException {
+    flushChunkToFileWriter(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
+    seqFileIndexArray[subTaskId] = 0;
   }
 
   @Override
-  public void write(long timestamp, Object value) throws IOException {
-    checkTimeAndMayFlushChunkToCurrentFile(timestamp);
-    checkAndMayStartChunkGroup();
-    writeDataPoint(timestamp, value);
-    updateDeviceStartAndEndTime(targetTsFileResources.get(seqFileIndex), timestamp);
-    checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndex));
-    isEmptyFile[seqFileIndex] = false;
+  public void write(long timestamp, Object value, int subTaskId) throws IOException {
+    checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
+    writeDataPoint(timestamp, value, subTaskId);
+    checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
+    isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
+    isEmptyFile[seqFileIndexArray[subTaskId]] = false;
   }
 
   @Override
@@ -123,16 +127,21 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
     }
     fileWriterList = null;
     seqTsFileResources = null;
-    chunkWriter = null;
   }
 
-  private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp) throws IOException {
+  @Override
+  public List<TsFileIOWriter> getFileIOWriter() {
+    return fileWriterList;
+  }
+
+  private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp, int subTaskId)
+      throws IOException {
+    int fileIndex = seqFileIndexArray[subTaskId];
     // if timestamp is later than the current source seq tsfile, than flush chunk writer
-    while (timestamp > currentDeviceEndTime[seqFileIndex]) {
-      if (seqFileIndex != seqTsFileResources.size() - 1) {
-        writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
-        chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex));
-        seqFileIndex++;
+    while (timestamp > currentDeviceEndTime[fileIndex]) {
+      if (fileIndex != seqTsFileResources.size() - 1) {
+        flushChunkToFileWriter(fileWriterList.get(fileIndex), subTaskId);
+        seqFileIndexArray[subTaskId] = ++fileIndex;
       } else {
         // If the seq file is deleted for various reasons, the following two situations may occur
         // when selecting the source files: (1) unseq files may have some devices or measurements
@@ -168,11 +177,4 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
       fileIndex++;
     }
   }
-
-  private void checkAndMayStartChunkGroup() throws IOException {
-    if (!hasTargetFileStartChunkGroup[seqFileIndex]) {
-      fileWriterList.get(seqFileIndex).startChunkGroup(deviceId);
-      hasTargetFileStartChunkGroup[seqFileIndex] = true;
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index 7b0e31095d..af2cc53c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -19,22 +19,20 @@
 package org.apache.iotdb.db.engine.compaction.writer;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   private TsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
 
-  private final TsFileResource targetTsFileResource;
-
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
-    fileWriter = new RestorableTsFileIOWriter(targetFileResource.getTsFile());
+    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
     isEmptyFile = true;
-    this.targetTsFileResource = targetFileResource;
   }
 
   @Override
@@ -50,17 +48,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
   }
 
   @Override
-  public void endMeasurement() throws IOException {
-    writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
-    chunkWriter.writeToFileWriter(fileWriter);
-    chunkWriter = null;
+  public void endMeasurement(int subTaskId) throws IOException {
+    flushChunkToFileWriter(fileWriter, subTaskId);
   }
 
   @Override
-  public void write(long timestamp, Object value) throws IOException {
-    writeDataPoint(timestamp, value);
-    updateDeviceStartAndEndTime(targetTsFileResource, timestamp);
-    checkChunkSizeAndMayOpenANewChunk(fileWriter);
+  public void write(long timestamp, Object value, int subTaskId) throws IOException {
+    writeDataPoint(timestamp, value, subTaskId);
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
     isEmptyFile = false;
   }
 
@@ -80,7 +75,11 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
     if (fileWriter != null && fileWriter.canWrite()) {
       fileWriter.close();
     }
-    chunkWriter = null;
     fileWriter = null;
   }
+
+  @Override
+  public List<TsFileIOWriter> getFileIOWriter() {
+    return Collections.singletonList(fileWriter);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index a6aab11e1c..69ff88dee0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -30,9 +30,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -202,6 +207,20 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
         CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+    assertEquals(
+        0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+    assertEquals(
+        0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+    assertEquals(
+        250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+    assertEquals(
+        600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+    assertEquals(
+        600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+    for (int i = 0; i < 5; i++) {
+      assertEquals(
+          749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i));
+    }
 
     for (int i = 0; i < 5; i++) {
       for (int j = 0; j < 5; j++) {
@@ -1752,7 +1771,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   }
 
   /**
-   * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+   * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
    *
    * <p>Seq files<br>
    * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -1836,6 +1855,34 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 2; i < 4; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     Map<String, Long> measurementMaxTime = new HashMap<>();
 
     for (int i = 0; i < 4; i++) {
@@ -1905,7 +1952,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   }
 
   /**
-   * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+   * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
    *
    * <p>Seq files<br>
    * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2002,6 +2049,34 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 2; i < 4; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     Map<String, Long> measurementMaxTime = new HashMap<>();
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2072,7 +2147,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   }
 
   /**
-   * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+   * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
    *
    * <p>Seq files<br>
    * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2165,6 +2240,32 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 2; i < 4; i++) {
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     Map<String, Long> measurementMaxTime = new HashMap<>();
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2231,7 +2332,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   }
 
   /**
-   * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+   * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
    *
    * <p>Seq files<br>
    * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2245,8 +2346,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
    * forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
    * 20450 ~ 20549 and 20550 ~ 20649.
    *
-   * <p>The data of d0, d1 and d2 is deleted in each file. Data in the first target file is all
-   * deleted.
+   * <p>The data of d0, d1 and d2 is deleted in each file. Data in the first and second target file
+   * is all deleted.
    */
   @Test
   public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
@@ -2320,6 +2421,21 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    Assert.assertEquals(2, targetResources.size());
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     Map<String, Long> measurementMaxTime = new HashMap<>();
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2380,7 +2496,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   }
 
   /**
-   * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+   * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
    *
    * <p>Seq files<br>
    * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2477,6 +2593,48 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    Assert.assertEquals(4, targetResources.size());
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 2; i < 3; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.clear();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 3; i < 4; i++) {
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     Map<String, Long> measurementMaxTime = new HashMap<>();
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2933,6 +3091,35 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     CompactionUtils.compact(seqResources, unseqResources, targetResources);
     CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+    Assert.assertEquals(4, targetResources.size());
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
+    for (int i = 0; i < 2; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
+      Assert.assertFalse(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
+      check(targetResources.get(i), deviceIdList);
+    }
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
+    for (int i = 2; i < 4; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
     for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
         i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
         i++) {
@@ -3433,6 +3620,54 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
           CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
       CompactionUtils.compact(seqResources, unseqResources, targetResources);
       CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+      Assert.assertEquals(4, targetResources.size());
+      List<String> deviceIdList = new ArrayList<>();
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+      for (int i = 0; i < 2; i++) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        check(targetResources.get(i), deviceIdList);
+      }
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+      for (int i = 2; i < 3; i++) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        check(targetResources.get(i), deviceIdList);
+      }
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5");
+      for (int i = 3; i < 4; i++) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5"));
+        check(targetResources.get(i), deviceIdList);
+      }
     } catch (MetadataException
         | IOException
         | WriteProcessException
@@ -3456,6 +3691,35 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
       CompactionUtils.compact(seqResources, unseqResources, targetResources);
       CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
 
+      Assert.assertEquals(2, targetResources.size());
+      List<String> deviceIdList = new ArrayList<>();
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+      for (int i = 0; i < 1; i++) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        check(targetResources.get(i), deviceIdList);
+      }
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+      deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+      for (int i = 1; i < 2; i++) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        check(targetResources.get(i), deviceIdList);
+      }
+
       for (int i = 0; i < 4; i++) {
         for (int j = 0; j < 4; j++) {
           PartialPath path =
@@ -3515,4 +3779,47 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
       CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
     }
   }
+
+  /**
+   * Check whether target file contain empty chunk group or not. Assert fail if it contains empty
+   * chunk group whose deviceID is not in the deviceIdList.
+   */
+  public void check(TsFileResource targetResource, List<String> deviceIdList) throws IOException {
+    byte marker;
+    try (TsFileSequenceReader reader =
+        new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) {
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            int dataSize = header.getDataSize();
+            reader.position(reader.position() + dataSize);
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            String deviceID = chunkGroupHeader.getDeviceID();
+            if (!deviceIdList.contains(deviceID)) {
+              Assert.fail(
+                  "Target file "
+                      + targetResource.getTsFile().getPath()
+                      + " contains empty chunk group "
+                      + deviceID);
+            }
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            throw new IOException("Unexpected marker " + marker);
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index a413419f32..5e0824911e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross;
 
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceMergeFileSelector;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
@@ -108,6 +109,7 @@ public class CrossSpaceCompactionTest {
           TSFileDescriptor.getInstance().getConfig().getCompressor(),
           Collections.emptyMap());
     }
+    CompactionTaskManager.getInstance().start();
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
   }
 
@@ -118,6 +120,7 @@ public class CrossSpaceCompactionTest {
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
     IoTDB.configManager.clear();
+    CompactionTaskManager.getInstance().stop();
     EnvironmentUtils.cleanAllDir();
     Thread.currentThread().setName(oldThreadName);
     new CompactionConfigRestorer().restoreCompactionConfig();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7eb9a95110..2f865f297f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -142,14 +142,14 @@ public class TsFileIOWriter implements AutoCloseable {
     out.write(VERSION_NUMBER_BYTE);
   }
 
-  public void startChunkGroup(String deviceId) throws IOException {
+  public int startChunkGroup(String deviceId) throws IOException {
     this.currentChunkGroupDeviceId = deviceId;
     if (logger.isDebugEnabled()) {
       logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
     }
     chunkMetadataList = new ArrayList<>();
     ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId);
-    chunkGroupHeader.serializeTo(out.wrapAsStream());
+    return chunkGroupHeader.serializeTo(out.wrapAsStream());
   }
 
   /**
@@ -458,6 +458,10 @@ public class TsFileIOWriter implements AutoCloseable {
     out.flush();
   }
 
+  public void truncate(long offset) throws IOException {
+    out.truncate(offset);
+  }
+
   /**
    * this function is only for Test.
    *