You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/11 07:59:35 UTC

[iotdb] 01/01: fix exception when chunk compression type or encoding type is not the same

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5662-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76e3b8d15b1bb9b4589e331460149e2b68a418c6
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Mar 11 15:59:18 2023 +0800

    fix exception when chunk compression type or encoding type is not the same
---
 .../utils/SingleSeriesCompactionExecutor.java      | 33 +++++++++++++++-------
 1 file changed, 23 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..8da67ecb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -166,15 +166,18 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void processLargeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk, then flush it
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk, then flush it
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // flush it to file directly
@@ -182,17 +185,27 @@ public class SingleSeriesCompactionExecutor {
     }
   }
 
+  private boolean canMergeChunk(Chunk chunk1, Chunk chunk2) {
+    ChunkHeader header1 = chunk1.getHeader();
+    ChunkHeader header2 = chunk2.getHeader();
+    return (header1.getCompressionType() == header2.getCompressionType())
+        && (header1.getEncodingType() == header2.getEncodingType());
+  }
+
   private void processMiddleChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
     // the chunk is not too large either too small
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // cached current chunk