You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2024/03/22 13:54:00 UTC

(iotdb) branch master updated: Allow series data type not consistent in compaction

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9feffb1510d Allow series data type not consistent in compaction
9feffb1510d is described below

commit 9feffb1510d3b4b2d0b5a21cc9f3ef811fe619aa
Author: shuwenwei <55...@users.noreply.github.com>
AuthorDate: Fri Mar 22 21:53:55 2024 +0800

    Allow series data type not consistent in compaction
---
 .../impl/ReadChunkCompactionPerformer.java         |  47 +++++
 .../execute/utils/MultiTsFileDeviceIterator.java   |  16 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |  22 ++-
 .../ReadChunkAlignedSeriesCompactionExecutor.java  |  13 +-
 .../compaction/io/CompactionTsFileReader.java      |  51 +++++
 .../compaction/CompactionDataTypeNotMatchTest.java | 212 +++++++++++++++++++++
 ...nkCompactionPerformerWithAlignedSeriesTest.java |   2 +-
 7 files changed, 354 insertions(+), 9 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index dd8ad9b00e5..569d97910fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -184,6 +185,9 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
       // dead-loop.
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
+      // remove the chunk metadata whose data type not match the data type of last chunk
+      readerAndChunkMetadataList =
+          filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
           new SingleSeriesCompactionExecutor(
               p, readerAndChunkMetadataList, writer, targetResource, summary);
@@ -192,6 +196,49 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     writer.endChunkGroup();
   }
 
+  private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+      filterDataTypeNotMatchedChunkMetadata(
+          LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList) {
+    if (readerAndChunkMetadataList.isEmpty()) {
+      return readerAndChunkMetadataList;
+    }
+    LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new LinkedList<>();
+    // find correct data type
+    TSDataType correctDataType = null;
+    for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && correctDataType == null; i--) {
+      List<ChunkMetadata> chunkMetadataList = readerAndChunkMetadataList.get(i).getRight();
+      if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
+        continue;
+      }
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        if (chunkMetadata == null) {
+          continue;
+        }
+        correctDataType = chunkMetadata.getDataType();
+        break;
+      }
+    }
+    if (correctDataType == null) {
+      return readerAndChunkMetadataList;
+    }
+    // check data type consistent and skip compact files with wrong data type
+    for (Pair<TsFileSequenceReader, List<ChunkMetadata>> tsFileSequenceReaderListPair :
+        readerAndChunkMetadataList) {
+      boolean dataTypeConsistent = true;
+      for (ChunkMetadata chunkMetadata : tsFileSequenceReaderListPair.getRight()) {
+        if (chunkMetadata != null && chunkMetadata.getDataType() != correctDataType) {
+          dataTypeConsistent = false;
+          break;
+        }
+      }
+      if (!dataTypeConsistent) {
+        continue;
+      }
+      result.add(tsFileSequenceReaderListPair);
+    }
+    return result;
+  }
+
   @Override
   public void setSourceFiles(List<TsFileResource> seqFiles) {
     this.seqFiles = seqFiles;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index d80ba2fab37..c6241c99a9c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -247,6 +248,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
       getTimeseriesMetadataOffsetOfCurrentDevice() throws IOException {
     Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
         new HashMap<>();
+    Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
     for (TsFileResource resource : tsFileResourcesSortedByDesc) {
       if (!deviceIteratorMap.containsKey(resource)
           || !deviceIteratorMap.get(resource).current().equals(currentDevice)) {
@@ -255,14 +257,22 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
         continue;
       }
       TsFileSequenceReader reader = readerMap.get(resource);
-      for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> entrySet :
-          reader
-              .getTimeseriesMetadataOffsetByDevice(
+      for (Map.Entry<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> entrySet :
+          ((CompactionTsFileReader) reader)
+              .getTimeseriesMetadataAndOffsetByDevice(
                   deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
                   Collections.emptySet(),
                   false)
               .entrySet()) {
         String measurementId = entrySet.getKey();
+        // skip the TimeseriesMetadata whose data type is not consistent
+        TSDataType dataTypeOfCurrentTimeseriesMetadata = entrySet.getValue().left.getTsDataType();
+        TSDataType correctDataTypeOfCurrentMeasurement =
+            measurementDataTypeMap.putIfAbsent(measurementId, dataTypeOfCurrentTimeseriesMetadata);
+        if (correctDataTypeOfCurrentMeasurement != null
+            && correctDataTypeOfCurrentMeasurement != dataTypeOfCurrentTimeseriesMetadata) {
+          continue;
+        }
         timeseriesMetadataOffsetMap.putIfAbsent(measurementId, new HashMap<>());
         timeseriesMetadataOffsetMap.get(measurementId).put(resource, entrySet.getValue().right);
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index d3a26201787..9cfd7cee948 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -169,11 +169,16 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
             valueChunkMetadatas.add(null);
           } else {
             // current file contains this aligned timeseries
-            valueChunkMetadatas.add(
+            List<IChunkMetadata> valueColumnChunkMetadataList =
                 readerCacheMap
                     .get(resource)
                     .getChunkMetadataListByTimeseriesMetadataOffset(
-                        timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right));
+                        timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right);
+            if (isValueChunkDataTypeMatchSchema(valueColumnChunkMetadataList)) {
+              valueChunkMetadatas.add(valueColumnChunkMetadataList);
+            } else {
+              valueChunkMetadatas.add(null);
+            }
           }
         }
       }
@@ -239,6 +244,19 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
     }
   }
 
+  private boolean isValueChunkDataTypeMatchSchema(
+      List<IChunkMetadata> chunkMetadataListOfOneValueColumn) {
+    for (IChunkMetadata chunkMetadata : chunkMetadataListOfOneValueColumn) {
+      if (chunkMetadata == null) {
+        continue;
+      }
+      String measurement = chunkMetadata.getMeasurementUid();
+      IMeasurementSchema schema = measurementSchemaMap.get(measurement);
+      return schema.getType() == chunkMetadata.getDataType();
+    }
+    return true;
+  }
+
   /**
    * Deserialize chunk into pages without uncompressing and put them into the page queue.
    *
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index 29768e6ff5e..d5f0785980d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -103,8 +103,9 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
 
   private void collectValueColumnSchemaList() throws IOException {
     Map<String, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
-    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
-        this.readerAndChunkMetadataList) {
+    for (int i = this.readerAndChunkMetadataList.size() - 1; i >= 0; i--) {
+      Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair =
+          this.readerAndChunkMetadataList.get(i);
       CompactionTsFileReader reader = (CompactionTsFileReader) pair.getLeft();
       List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
       for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
@@ -184,7 +185,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     Collections.fill(valueChunks, getChunkLoader(reader, null));
     long pointNum = 0;
     for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
-      if (chunkMetadata == null) {
+      if (chunkMetadata == null || !isValueChunkDataTypeMatchSchema(chunkMetadata)) {
         continue;
       }
       pointNum += chunkMetadata.getStatistics().getCount();
@@ -201,6 +202,12 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     }
   }
 
+  private boolean isValueChunkDataTypeMatchSchema(IChunkMetadata valueChunkMetadata) {
+    String measurement = valueChunkMetadata.getMeasurementUid();
+    IMeasurementSchema schema = schemaList.get(measurementSchemaListIndexMap.get(measurement));
+    return schema.getType() == valueChunkMetadata.getDataType();
+  }
+
   private ChunkLoader getChunkLoader(TsFileSequenceReader reader, ChunkMetadata chunkMetadata)
       throws IOException {
     if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 0) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 476c8d19c89..f4231d378de 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.tsfile.file.IMetadataIndexEntry;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
 import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -36,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -182,6 +185,54 @@ public class CompactionTsFileReader extends TsFileSequenceReader {
     return result;
   }
 
+  public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
+      getTimeseriesMetadataAndOffsetByDevice(
+          MetadataIndexNode measurementNode,
+          Set<String> excludedMeasurementIds,
+          boolean needChunkMetadata)
+          throws IOException {
+    long before = readDataSize.get();
+    Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
+        new LinkedHashMap<>();
+    List<IMetadataIndexEntry> childrenEntryList = measurementNode.getChildren();
+    for (int i = 0; i < childrenEntryList.size(); i++) {
+      long startOffset = childrenEntryList.get(i).getOffset();
+      long endOffset =
+          i == childrenEntryList.size() - 1
+              ? measurementNode.getEndOffset()
+              : childrenEntryList.get(i + 1).getOffset();
+      ByteBuffer nextBuffer = readData(startOffset, endOffset);
+      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+        // leaf measurement node
+        while (nextBuffer.hasRemaining()) {
+          int metadataStartOffset = nextBuffer.position();
+          TimeseriesMetadata timeseriesMetadata =
+              TimeseriesMetadata.deserializeFrom(
+                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
+          timeseriesMetadataOffsetMap.put(
+              timeseriesMetadata.getMeasurementId(),
+              new Pair<>(
+                  timeseriesMetadata,
+                  new Pair<>(
+                      startOffset + metadataStartOffset, startOffset + nextBuffer.position())));
+        }
+
+      } else {
+        // internal measurement node
+        MetadataIndexNode nextLayerMeasurementNode =
+            MetadataIndexNode.deserializeFrom(nextBuffer, false);
+        timeseriesMetadataOffsetMap.putAll(
+            getTimeseriesMetadataAndOffsetByDevice(
+                nextLayerMeasurementNode, excludedMeasurementIds, needChunkMetadata));
+      }
+    }
+
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize);
+    return timeseriesMetadataOffsetMap;
+  }
+
   @Override
   public void getDeviceTimeseriesMetadata(
       List<TimeseriesMetadata> timeseriesMetadataList,
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java
new file mode 100644
index 00000000000..c07616a4ce5
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompactionDataTypeNotMatchTest extends AbstractCompactionTest {
+  private final String oldThreadName = Thread.currentThread().getName();
+  private final IDeviceID device = new PlainDeviceID(COMPACTION_TEST_SG + ".d1");
+
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, InterruptedException {
+    super.setUp();
+    Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    Thread.currentThread().setName(oldThreadName);
+  }
+
+  @Test
+  public void testCompactNonAlignedSeriesWithReadChunkCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithNonAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  @Test
+  public void testCompactNonAlignedSeriesWithFastCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithNonAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  @Test
+  public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithNonAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  @Test
+  public void testCompactAlignedSeriesWithReadChunkCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  @Test
+  public void testCompactAlignedSeriesWithFastCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  @Test
+  public void testCompactAlignedSeriesWithReadPointCompactionPerformer()
+      throws IOException, WriteProcessException {
+    generateDataTypeNotMatchFilesWithAlignedSeries();
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
+    Assert.assertTrue(task.start());
+    TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+    Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+  }
+
+  private void generateDataTypeNotMatchFilesWithNonAlignedSeries()
+      throws IOException, WriteProcessException {
+    MeasurementSchema measurementSchema1 = new MeasurementSchema("s1", TSDataType.BOOLEAN);
+    TsFileResource resource1 = createEmptyFileAndResource(true);
+    resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+    try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
+      writer.registerTimeseries(new Path(device), measurementSchema1);
+      TSRecord record = new TSRecord(1, device);
+      record.addTuple(new BooleanDataPoint("s1", true));
+      writer.write(record);
+      writer.flushAllChunkGroups();
+    }
+    resource1.updateStartTime(device, 1);
+    resource1.updateEndTime(device, 1);
+    resource1.serialize();
+    seqResources.add(resource1);
+
+    MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.INT32);
+    TsFileResource resource2 = createEmptyFileAndResource(true);
+    resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+    try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
+      writer.registerTimeseries(new Path(device), measurementSchema2);
+      TSRecord record = new TSRecord(2, device);
+      record.addTuple(new IntDataPoint("s1", 10));
+      writer.write(record);
+      writer.flushAllChunkGroups();
+    }
+    resource2.updateStartTime(device, 2);
+    resource2.updateEndTime(device, 2);
+    resource2.serialize();
+    seqResources.add(resource2);
+  }
+
+  private void generateDataTypeNotMatchFilesWithAlignedSeries()
+      throws IOException, WriteProcessException {
+    List<MeasurementSchema> measurementSchemas1 = new ArrayList<>();
+    measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32));
+    measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32));
+
+    TsFileResource resource1 = createEmptyFileAndResource(true);
+    resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+    try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
+      writer.registerAlignedTimeseries(new Path(device), measurementSchemas1);
+      TSRecord record = new TSRecord(1, device);
+      record.addTuple(new IntDataPoint("s1", 0));
+      record.addTuple(new IntDataPoint("s2", 1));
+      writer.writeAligned(record);
+      writer.flushAllChunkGroups();
+    }
+    resource1.updateStartTime(device, 1);
+    resource1.updateEndTime(device, 1);
+    resource1.serialize();
+    seqResources.add(resource1);
+
+    List<MeasurementSchema> measurementSchemas2 = new ArrayList<>();
+    measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.BOOLEAN));
+    measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.BOOLEAN));
+    TsFileResource resource2 = createEmptyFileAndResource(true);
+    resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+    try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
+      writer.registerAlignedTimeseries(new Path(device), measurementSchemas2);
+      TSRecord record = new TSRecord(2, device);
+      record.addTuple(new BooleanDataPoint("s1", true));
+      record.addTuple(new BooleanDataPoint("s2", true));
+      writer.writeAligned(record);
+      writer.flushAllChunkGroups();
+    }
+    resource2.updateStartTime(device, 2);
+    resource2.updateEndTime(device, 2);
+    resource2.serialize();
+    seqResources.add(resource2);
+  }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
index 5b840c8dad8..8c925424b38 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
@@ -625,7 +625,7 @@ public class NewReadChunkCompactionPerformerWithAlignedSeriesTest extends Abstra
     performer.perform();
     CompactionUtils.moveTargetFile(
         Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
-    Assert.assertEquals(8, summary.getDirectlyFlushChunkNum());
+    Assert.assertEquals(16, summary.getDirectlyFlushChunkNum());
     Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
     TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
     Assert.assertEquals(