You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/11 07:59:35 UTC
[iotdb] 01/01: fix exception when chunk compression type or encoding type is not the same
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-5662-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 76e3b8d15b1bb9b4589e331460149e2b68a418c6
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Mar 11 15:59:18 2023 +0800
fix exception when chunk compression type or encoding type is not the same
---
.../utils/SingleSeriesCompactionExecutor.java | 33 +++++++++++++++-------
1 file changed, 23 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..8da67ecb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -166,15 +166,18 @@ public class SingleSeriesCompactionExecutor {
}
private void processLargeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
- if (pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+ // if there is a cached chunk, merge it with current chunk, then flush it
+ mergeWithCachedChunk(chunk, chunkMetadata);
+ flushCachedChunkIfLargeEnough();
+ } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null) {
+ writeCachedChunkIntoChunkWriter();
+ }
// if there are points remaining in ChunkWriter
// deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter
writeChunkIntoChunkWriter(chunk);
flushChunkWriterIfLargeEnough();
- } else if (cachedChunk != null) {
- // if there is a cached chunk, merge it with current chunk, then flush it
- mergeWithCachedChunk(chunk, chunkMetadata);
- flushCachedChunkIfLargeEnough();
} else {
// there is no points remaining in ChunkWriter and no cached chunk
// flush it to file directly
@@ -182,17 +185,27 @@ public class SingleSeriesCompactionExecutor {
}
}
+ private boolean canMergeChunk(Chunk chunk1, Chunk chunk2) {
+ ChunkHeader header1 = chunk1.getHeader();
+ ChunkHeader header2 = chunk2.getHeader();
+ return (header1.getCompressionType() == header2.getCompressionType())
+ && (header1.getEncodingType() == header2.getEncodingType());
+ }
+
private void processMiddleChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
// the chunk is not too large either too small
- if (pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+ // if there is a cached chunk, merge it with current chunk
+ mergeWithCachedChunk(chunk, chunkMetadata);
+ flushCachedChunkIfLargeEnough();
+ } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+ if (cachedChunk != null) {
+ writeCachedChunkIntoChunkWriter();
+ }
// if there are points remaining in ChunkWriter
// deserialize current chunk and write to ChunkWriter
writeChunkIntoChunkWriter(chunk);
flushChunkWriterIfLargeEnough();
- } else if (cachedChunk != null) {
- // if there is a cached chunk, merge it with current chunk
- mergeWithCachedChunk(chunk, chunkMetadata);
- flushCachedChunkIfLargeEnough();
} else {
// there is no points remaining in ChunkWriter and no cached chunk
// cached current chunk