You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/11 07:59:34 UTC

[iotdb] branch IOTDB-5662-0.13 created (now 76e3b8d15b)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-5662-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 76e3b8d15b fix exception when chunk compression type or encoding type is not the same

This branch includes the following new commits:

     new 76e3b8d15b fix exception when chunk compression type or encoding type is not the same

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix exception when chunk compression type or encoding type is not the same

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5662-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76e3b8d15b1bb9b4589e331460149e2b68a418c6
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Mar 11 15:59:18 2023 +0800

    fix exception when chunk compression type or encoding type is not the same
---
 .../utils/SingleSeriesCompactionExecutor.java      | 33 +++++++++++++++-------
 1 file changed, 23 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..8da67ecb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -166,15 +166,18 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void processLargeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk, then flush it
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk, then flush it
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // flush it to file directly
@@ -182,17 +185,27 @@ public class SingleSeriesCompactionExecutor {
     }
   }
 
+  private boolean canMergeChunk(Chunk chunk1, Chunk chunk2) {
+    ChunkHeader header1 = chunk1.getHeader();
+    ChunkHeader header2 = chunk2.getHeader();
+    return (header1.getCompressionType() == header2.getCompressionType())
+        && (header1.getEncodingType() == header2.getEncodingType());
+  }
+
   private void processMiddleChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
     // the chunk is not too large either too small
-    if (pointCountInChunkWriter != 0L) {
+    if (cachedChunk != null && canMergeChunk(cachedChunk, chunk)) {
+      // if there is a cached chunk, merge it with current chunk
+      mergeWithCachedChunk(chunk, chunkMetadata);
+      flushCachedChunkIfLargeEnough();
+    } else if (cachedChunk != null || pointCountInChunkWriter != 0L) {
+      if (cachedChunk != null) {
+        writeCachedChunkIntoChunkWriter();
+      }
       // if there are points remaining in ChunkWriter
       // deserialize current chunk and write to ChunkWriter
       writeChunkIntoChunkWriter(chunk);
       flushChunkWriterIfLargeEnough();
-    } else if (cachedChunk != null) {
-      // if there is a cached chunk, merge it with current chunk
-      mergeWithCachedChunk(chunk, chunkMetadata);
-      flushCachedChunkIfLargeEnough();
     } else {
       // there is no points remaining in ChunkWriter and no cached chunk
       // cached current chunk