You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/07/28 03:36:47 UTC

[iotdb] 01/03: add compression and encoding type check for FastCompactionPerformer

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c3723b632817b118d3fc8c9add66e153137d76b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Jul 26 13:20:14 2023 +0800

    add compression and encoding type check for FastCompactionPerformer
---
 .../fast/AlignedSeriesCompactionExecutor.java        | 20 ++++++++++++++++++++
 .../fast/NonAlignedSeriesCompactionExecutor.java     | 14 ++++++++++++++
 .../executor/fast/SeriesCompactionExecutor.java      | 12 ++++++++----
 .../executor/fast/element/ChunkMetadataElement.java  |  2 ++
 .../utils/executor/fast/element/PageElement.java     |  4 ++++
 5 files changed, 48 insertions(+), 4 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 8dd65f28505..1c03d23f092 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -345,6 +345,26 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
               .readMemChunk((ChunkMetadata) valueChunkMetadata));
     }
     chunkMetadataElement.valueChunks = valueChunks;
+    setForceDecoding(chunkMetadataElement);
+  }
+
+  void setForceDecoding(ChunkMetadataElement chunkMetadataElement) {
+    IMeasurementSchema timeChunkSchema = measurementSchemas.get(0);
+    if (timeChunkSchema.getCompressor()
+            != chunkMetadataElement.chunk.getHeader().getCompressionType()
+        || timeChunkSchema.getEncodingType()
+            != chunkMetadataElement.chunk.getHeader().getEncodingType()) {
+      chunkMetadataElement.needForceDecoding = true;
+      return;
+    }
+    for (int i = 1; i < measurementSchemas.size(); i++) {
+      ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 1).getHeader();
+      if (header.getCompressionType() != measurementSchemas.get(i).getCompressor()
+          || header.getEncodingType() != measurementSchemas.get(i).getEncodingType()) {
+        chunkMetadataElement.needForceDecoding = true;
+        return;
+      }
+    }
   }
 
   /**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index 1a075fef39c..3cf146b61f3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
@@ -51,6 +53,10 @@ import java.util.Map;
 public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
   private boolean hasStartMeasurement = false;
 
+  private CompressionType seriesCompressionType = null;
+
+  private TSEncoding seriesTSEncoding = null;
+
   // tsfile resource -> timeseries metadata <startOffset, endOffset>
   // used to get the chunk metadatas from tsfile directly according to timeseries metadata offset.
   private Map<TsFileResource, Pair<Long, Long>> timeseriesMetadataOffsetMap;
@@ -211,6 +217,14 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
               header.getCompressionType());
       compactionWriter.startMeasurement(Collections.singletonList(schema), subTaskId);
       hasStartMeasurement = true;
+      seriesCompressionType = header.getCompressionType();
+      seriesTSEncoding = header.getEncodingType();
+      chunkMetadataElement.needForceDecoding = false;
+    } else {
+      ChunkHeader header = chunkMetadataElement.chunk.getHeader();
+      chunkMetadataElement.needForceDecoding =
+          header.getCompressionType() != seriesCompressionType
+              || header.getEncodingType() != seriesTSEncoding;
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
index 9cfdabc7f55..1290287c0d0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
@@ -144,7 +144,11 @@ public abstract class SeriesCompactionExecutor {
           firstChunkMetadataElement.chunkMetadata.getEndTime() >= nextChunkStartTime;
       boolean isModified = firstChunkMetadataElement.chunkMetadata.isModified();
 
-      if (isChunkOverlap || isModified) {
+      // read current chunk
+      readChunk(firstChunkMetadataElement);
+      boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding;
+
+      if (isChunkOverlap || isModified || forceDecodingChunk) {
         // has overlap or modified chunk, then deserialize it
         summary.chunkOverlapOrModified++;
         compactWithOverlapChunks(firstChunkMetadataElement);
@@ -164,7 +168,6 @@ public abstract class SeriesCompactionExecutor {
    */
   private void compactWithOverlapChunks(ChunkMetadataElement overlappedChunkMetadata)
       throws IOException, PageException, WriteProcessException, IllegalPathException {
-    readChunk(overlappedChunkMetadata);
     deserializeChunkIntoPageQueue(overlappedChunkMetadata);
 
     compactPages();
@@ -176,7 +179,6 @@ public abstract class SeriesCompactionExecutor {
    */
   private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElement)
       throws IOException, PageException, WriteProcessException, IllegalPathException {
-    readChunk(chunkMetadataElement);
     boolean success;
     if (isAligned) {
       success =
@@ -237,7 +239,9 @@ public abstract class SeriesCompactionExecutor {
           firstPageElement.pageHeader.getEndTime() >= nextPageStartTime
               || firstPageElement.pageHeader.getEndTime() >= nextChunkStartTime;
 
-      if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) {
+      if (isPageOverlap
+          || modifiedStatus == ModifiedStatus.PARTIAL_DELETED
+          || firstPageElement.needForceDecoding) {
         // has overlap or modified pages, then deserialize it
         summary.pageOverlapOrModified += 1;
         pointPriorityReader.addNewPage(firstPageElement);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
index bcebf074e9f..fafdef50f68 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
@@ -40,6 +40,8 @@ public class ChunkMetadataElement {
 
   public List<Chunk> valueChunks;
 
+  public boolean needForceDecoding;
+
   public ChunkMetadataElement(
       IChunkMetadata chunkMetadata, long priority, boolean isLastChunk, FileElement fileElement) {
     this.chunkMetadata = chunkMetadata;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
index c86744ddec5..0977c9d6a44 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
@@ -56,6 +56,8 @@ public class PageElement {
 
   public ChunkMetadataElement chunkMetadataElement;
 
+  public boolean needForceDecoding;
+
   public PageElement(
       PageHeader pageHeader,
       ByteBuffer pageData,
@@ -70,6 +72,7 @@ public class PageElement {
     this.startTime = pageHeader.getStartTime();
     this.chunkMetadataElement = chunkMetadataElement;
     this.isLastPage = isLastPage;
+    this.needForceDecoding = chunkMetadataElement.needForceDecoding;
   }
 
   @SuppressWarnings("squid:S107")
@@ -91,6 +94,7 @@ public class PageElement {
     this.startTime = pageHeader.getStartTime();
     this.chunkMetadataElement = chunkMetadataElement;
     this.isLastPage = isLastPage;
+    this.needForceDecoding = chunkMetadataElement.needForceDecoding;
   }
 
   public void deserializePage() throws IOException {