You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/01 14:58:45 UTC

[iotdb] branch master updated: [IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new be883fd  [IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)
be883fd is described below

commit be883fd8ccca035e627317ba75839041109fcf47
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Fri Apr 1 22:56:46 2022 +0800

    [IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)
---
 .../manage/CrossSpaceCompactionResource.java       |  3 ++-
 .../sizetiered/SizeTieredCompactionSelector.java   |  4 +--
 .../db/engine/storagegroup/TsFileResource.java     |  4 +++
 .../inner/InnerCompactionSchedulerTest.java        | 31 ++++++++++++++++++++++
 4 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index 7ced2fb..fecdd78 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
 
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -80,7 +81,7 @@ public class CrossSpaceCompactionResource {
    */
   private void filterUnseqResource(List<TsFileResource> unseqResources) {
     for (TsFileResource resource : unseqResources) {
-      if (resource.isCompacting() || resource.isCompactionCandidate() || !resource.isClosed()) {
+      if (resource.getStatus() != TsFileResourceStatus.CLOSED) {
         return;
       } else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
         this.unseqFiles.add(resource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 183e8da..8783aec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -124,8 +125,7 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
       if (currentName.getInnerCompactionCnt() != level
-          || currentFile.isCompactionCandidate()
-          || currentFile.isCompacting()) {
+          || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index a8dc5e5..6c9e83c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -626,6 +626,10 @@ public class TsFileResource {
     }
   }
 
+  public TsFileResourceStatus getStatus() {
+    return this.status;
+  }
+
   /**
    * check if any of the device lives over the given time bound. If the file is not closed, then
    * return true.
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 2b05370..0208d32 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -134,4 +134,35 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     }
     Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
   }
+
+  @Test
+  public void testFileSelectorWithUnclosedFile()
+      throws IOException, MetadataException, WriteProcessException {
+    IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
+    IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(50);
+    TsFileResourceList tsFileResources = new TsFileResourceList();
+    createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+    createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+    seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
+    TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
+    tsFileManager.addAll(seqResources, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(
+        "testSG", "0", 0L, tsFileManager, true, new FakedInnerSpaceCompactionTaskFactory());
+    CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
+
+    long waitingTime = 0;
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
+      try {
+        Thread.sleep(100);
+        waitingTime += 100;
+        if (waitingTime > MAX_WAITING_TIME) {
+          Assert.fail();
+          break;
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
+  }
 }