You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/07/28 03:36:47 UTC
[iotdb] 01/03: add compression and encoding type check for FastCompactionPerformer
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c3723b632817b118d3fc8c9add66e153137d76b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Jul 26 13:20:14 2023 +0800
add compression and encoding type check for FastCompactionPerformer
---
.../fast/AlignedSeriesCompactionExecutor.java | 20 ++++++++++++++++++++
.../fast/NonAlignedSeriesCompactionExecutor.java | 14 ++++++++++++++
.../executor/fast/SeriesCompactionExecutor.java | 12 ++++++++----
.../executor/fast/element/ChunkMetadataElement.java | 2 ++
.../utils/executor/fast/element/PageElement.java | 4 ++++
5 files changed, 48 insertions(+), 4 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 8dd65f28505..1c03d23f092 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -345,6 +345,26 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
.readMemChunk((ChunkMetadata) valueChunkMetadata));
}
chunkMetadataElement.valueChunks = valueChunks;
+ setForceDecoding(chunkMetadataElement);
+ }
+
+ void setForceDecoding(ChunkMetadataElement chunkMetadataElement) {
+ IMeasurementSchema timeChunkSchema = measurementSchemas.get(0);
+ if (timeChunkSchema.getCompressor()
+ != chunkMetadataElement.chunk.getHeader().getCompressionType()
+ || timeChunkSchema.getEncodingType()
+ != chunkMetadataElement.chunk.getHeader().getEncodingType()) {
+ chunkMetadataElement.needForceDecoding = true;
+ return;
+ }
+ for (int i = 1; i < measurementSchemas.size(); i++) {
+ ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 1).getHeader();
+ if (header.getCompressionType() != measurementSchemas.get(i).getCompressor()
+ || header.getEncodingType() != measurementSchemas.get(i).getEncodingType()) {
+ chunkMetadataElement.needForceDecoding = true;
+ return;
+ }
+ }
}
/**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index 1a075fef39c..3cf146b61f3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
@@ -51,6 +53,10 @@ import java.util.Map;
public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
private boolean hasStartMeasurement = false;
+ private CompressionType seriesCompressionType = null;
+
+ private TSEncoding seriesTSEncoding = null;
+
// tsfile resource -> timeseries metadata <startOffset, endOffset>
// used to get the chunk metadatas from tsfile directly according to timeseries metadata offset.
private Map<TsFileResource, Pair<Long, Long>> timeseriesMetadataOffsetMap;
@@ -211,6 +217,14 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor
header.getCompressionType());
compactionWriter.startMeasurement(Collections.singletonList(schema), subTaskId);
hasStartMeasurement = true;
+ seriesCompressionType = header.getCompressionType();
+ seriesTSEncoding = header.getEncodingType();
+ chunkMetadataElement.needForceDecoding = false;
+ } else {
+ ChunkHeader header = chunkMetadataElement.chunk.getHeader();
+ chunkMetadataElement.needForceDecoding =
+ header.getCompressionType() != seriesCompressionType
+ || header.getEncodingType() != seriesTSEncoding;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
index 9cfdabc7f55..1290287c0d0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
@@ -144,7 +144,11 @@ public abstract class SeriesCompactionExecutor {
firstChunkMetadataElement.chunkMetadata.getEndTime() >= nextChunkStartTime;
boolean isModified = firstChunkMetadataElement.chunkMetadata.isModified();
- if (isChunkOverlap || isModified) {
+ // read current chunk
+ readChunk(firstChunkMetadataElement);
+ boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding;
+
+ if (isChunkOverlap || isModified || forceDecodingChunk) {
// has overlap or modified chunk, then deserialize it
summary.chunkOverlapOrModified++;
compactWithOverlapChunks(firstChunkMetadataElement);
@@ -164,7 +168,6 @@ public abstract class SeriesCompactionExecutor {
*/
private void compactWithOverlapChunks(ChunkMetadataElement overlappedChunkMetadata)
throws IOException, PageException, WriteProcessException, IllegalPathException {
- readChunk(overlappedChunkMetadata);
deserializeChunkIntoPageQueue(overlappedChunkMetadata);
compactPages();
@@ -176,7 +179,6 @@ public abstract class SeriesCompactionExecutor {
*/
private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElement)
throws IOException, PageException, WriteProcessException, IllegalPathException {
- readChunk(chunkMetadataElement);
boolean success;
if (isAligned) {
success =
@@ -237,7 +239,9 @@ public abstract class SeriesCompactionExecutor {
firstPageElement.pageHeader.getEndTime() >= nextPageStartTime
|| firstPageElement.pageHeader.getEndTime() >= nextChunkStartTime;
- if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) {
+ if (isPageOverlap
+ || modifiedStatus == ModifiedStatus.PARTIAL_DELETED
+ || firstPageElement.needForceDecoding) {
// has overlap or modified pages, then deserialize it
summary.pageOverlapOrModified += 1;
pointPriorityReader.addNewPage(firstPageElement);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
index bcebf074e9f..fafdef50f68 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
@@ -40,6 +40,8 @@ public class ChunkMetadataElement {
public List<Chunk> valueChunks;
+ public boolean needForceDecoding;
+
public ChunkMetadataElement(
IChunkMetadata chunkMetadata, long priority, boolean isLastChunk, FileElement fileElement) {
this.chunkMetadata = chunkMetadata;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
index c86744ddec5..0977c9d6a44 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java
@@ -56,6 +56,8 @@ public class PageElement {
public ChunkMetadataElement chunkMetadataElement;
+ public boolean needForceDecoding;
+
public PageElement(
PageHeader pageHeader,
ByteBuffer pageData,
@@ -70,6 +72,7 @@ public class PageElement {
this.startTime = pageHeader.getStartTime();
this.chunkMetadataElement = chunkMetadataElement;
this.isLastPage = isLastPage;
+ this.needForceDecoding = chunkMetadataElement.needForceDecoding;
}
@SuppressWarnings("squid:S107")
@@ -91,6 +94,7 @@ public class PageElement {
this.startTime = pageHeader.getStartTime();
this.chunkMetadataElement = chunkMetadataElement;
this.isLastPage = isLastPage;
+ this.needForceDecoding = chunkMetadataElement.needForceDecoding;
}
public void deserializePage() throws IOException {