You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/22 07:04:44 UTC

[iotdb] 01/01: [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch fix-cannot-select-files-when-seq-empty
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 60869a0a5b9ea9db5471787a565833154a78661d
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sat May 20 10:44:21 2023 +0800

    [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)
---
 .../compaction/schedule/CompactionScheduler.java   | 49 ++++++++++++++++------
 .../compaction/schedule/CompactionTaskManager.java |  1 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 12 ++++--
 .../db/engine/storagegroup/TsFileManager.java      |  1 +
 .../SizeTieredCompactionSelectorTest.java          | 38 +++++++++++++++++
 5 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 97a12af6707..5c3f1f80269 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -52,26 +52,37 @@ public class CompactionScheduler {
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
+  /**
+   * Select compaction task and submit them to CompactionTaskManager.
+   *
+   * @param tsFileManager tsfileManager that contains source files
+   * @param timePartition the time partition to execute the selection
+   * @return the count of submitted task
+   */
+  public static int scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
     if (!tsFileManager.isAllowCompaction()) {
-      return;
+      return 0;
     }
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     try {
-      tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
-      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
-      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
+      trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+      trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
+      trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
     } catch (InterruptedException e) {
       LOGGER.error("Exception occurs when selecting compaction tasks", e);
       Thread.currentThread().interrupt();
     }
+    return trySubmitCount;
   }
 
-  public static void tryToSubmitInnerSpaceCompactionTask(
+  public static int tryToSubmitInnerSpaceCompactionTask(
       TsFileManager tsFileManager, long timePartition, boolean sequence)
       throws InterruptedException {
     if ((!config.isEnableSeqSpaceCompaction() && sequence)
         || (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
-      return;
+      return 0;
     }
 
     String storageGroupName = tsFileManager.getStorageGroupName();
@@ -94,6 +105,9 @@ public class CompactionScheduler {
             sequence
                 ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
                 : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     for (List<TsFileResource> task : taskList) {
       ICompactionPerformer performer =
           sequence
@@ -105,7 +119,7 @@ public class CompactionScheduler {
                   .getConfig()
                   .getInnerUnseqCompactionPerformer()
                   .createInstance();
-      CompactionTaskManager.getInstance()
+      if (CompactionTaskManager.getInstance()
           .addTaskToWaitingQueue(
               new InnerSpaceCompactionTask(
                   timePartition,
@@ -114,14 +128,17 @@ public class CompactionScheduler {
                   sequence,
                   performer,
                   CompactionTaskManager.currentTaskNum,
-                  tsFileManager.getNextCompactionTaskId()));
+                  tsFileManager.getNextCompactionTaskId()))) {
+        trySubmitCount++;
+      }
     }
+    return trySubmitCount;
   }
 
-  private static void tryToSubmitCrossSpaceCompactionTask(
+  private static int tryToSubmitCrossSpaceCompactionTask(
       TsFileManager tsFileManager, long timePartition) throws InterruptedException {
     if (!config.isEnableCrossSpaceCompaction()) {
-      return;
+      return 0;
     }
     String logicalStorageGroupName = tsFileManager.getStorageGroupName();
     String dataRegionId = tsFileManager.getDataRegionId();
@@ -137,8 +154,11 @@ public class CompactionScheduler {
         taskList.stream()
             .map(CrossCompactionTaskResource::getTotalMemoryCost)
             .collect(Collectors.toList());
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     for (int i = 0, size = taskList.size(); i < size; ++i) {
-      CompactionTaskManager.getInstance()
+      if (CompactionTaskManager.getInstance()
           .addTaskToWaitingQueue(
               new CrossSpaceCompactionTask(
                   timePartition,
@@ -151,7 +171,10 @@ public class CompactionScheduler {
                       .createInstance(),
                   CompactionTaskManager.currentTaskNum,
                   memoryCost.get(i),
-                  tsFileManager.getNextCompactionTaskId()));
+                  tsFileManager.getNextCompactionTaskId()))) {
+        trySubmitCount++;
+      }
     }
+    return trySubmitCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 158cdaa34f6..024b02fc9bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -71,6 +71,7 @@ public class CompactionTaskManager implements IService {
   private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
 
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
+
   private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
       new FixedPriorityBlockingQueue<>(
           config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index a5a3d7e20be..4b4d4cbd393 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2187,17 +2187,21 @@ public class DataRegion implements IDataRegionForQuery {
     logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId);
   }
 
-  protected void executeCompaction() {
+  protected int executeCompaction() {
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     try {
       List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
       // sort the time partition from largest to smallest
       timePartitions.sort(Comparator.reverseOrder());
       for (long timePartition : timePartitions) {
-        CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
+        trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
       }
     } catch (Throwable e) {
       logger.error("Meet error in compaction schedule.", e);
     }
+    return trySubmitCount;
   }
 
   /**
@@ -2328,10 +2332,10 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /** merge file under this database processor */
-  public void compact() {
+  public int compact() {
     writeLock("merge");
     try {
-      executeCompaction();
+      return executeCompaction();
     } finally {
       writeUnlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e0..c8ff4534099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -346,6 +346,7 @@ public class TsFileManager {
     readLock();
     try {
       Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet());
+      timePartitions.addAll(unsequenceFiles.keySet());
       return timePartitions;
     } finally {
       readUnlock();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
index 730a4e56795..7941628e8f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
@@ -20,14 +20,19 @@
 package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -65,4 +70,37 @@ public class SizeTieredCompactionSelectorTest {
             .selectInnerSpaceTask(manager.getOrCreateSequenceListByTimePartition(9))
             .size());
   }
+
+  @Test
+  public void testSubmitWhenSequenceFileIsEmpty() throws Exception {
+    DataRegion region = new DataRegion("root.test", "1");
+    TsFileManager manager = region.getTsFileManager();
+    int originCandidate =
+        IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+    IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(30);
+    boolean enableUnseqCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
+    CompactionTaskManager.getInstance().start();
+    try {
+      for (int i = 1; i < 91; ++i) {
+        TsFileResource resource = Mockito.mock(TsFileResource.class);
+        Mockito.when(resource.getTimePartition()).thenReturn(0L);
+        Mockito.when(resource.getTsFileSize()).thenReturn(100L);
+        Mockito.when(resource.getTsFile())
+            .thenReturn(new File(String.format("%d-%d-0-0.tsfile", i, i)));
+        Mockito.when(resource.getStatus()).thenReturn(TsFileResourceStatus.NORMAL);
+        manager.add(resource, false);
+      }
+      Assert.assertEquals(3, region.compact());
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setMaxInnerCompactionCandidateFileNum(originCandidate);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setEnableUnseqSpaceCompaction(enableUnseqCompaction);
+      CompactionTaskManager.getInstance().shutdown(60_000L);
+    }
+  }
 }