You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/22 07:04:43 UTC

[iotdb] branch fix-cannot-select-files-when-seq-empty created (now 60869a0a5b9)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch fix-cannot-select-files-when-seq-empty
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 60869a0a5b9 [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)

This branch includes the following new commits:

     new 60869a0a5b9 [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch fix-cannot-select-files-when-seq-empty
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 60869a0a5b9ea9db5471787a565833154a78661d
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sat May 20 10:44:21 2023 +0800

    [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)
---
 .../compaction/schedule/CompactionScheduler.java   | 49 ++++++++++++++++------
 .../compaction/schedule/CompactionTaskManager.java |  1 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 12 ++++--
 .../db/engine/storagegroup/TsFileManager.java      |  1 +
 .../SizeTieredCompactionSelectorTest.java          | 38 +++++++++++++++++
 5 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 97a12af6707..5c3f1f80269 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -52,26 +52,37 @@ public class CompactionScheduler {
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
+  /**
+   * Select compaction task and submit them to CompactionTaskManager.
+   *
+   * @param tsFileManager tsfileManager that contains source files
+   * @param timePartition the time partition to execute the selection
+   * @return the count of submitted task
+   */
+  public static int scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
     if (!tsFileManager.isAllowCompaction()) {
-      return;
+      return 0;
     }
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     try {
-      tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
-      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
-      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
+      trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+      trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
+      trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
     } catch (InterruptedException e) {
       LOGGER.error("Exception occurs when selecting compaction tasks", e);
       Thread.currentThread().interrupt();
     }
+    return trySubmitCount;
   }
 
-  public static void tryToSubmitInnerSpaceCompactionTask(
+  public static int tryToSubmitInnerSpaceCompactionTask(
       TsFileManager tsFileManager, long timePartition, boolean sequence)
       throws InterruptedException {
     if ((!config.isEnableSeqSpaceCompaction() && sequence)
         || (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
-      return;
+      return 0;
     }
 
     String storageGroupName = tsFileManager.getStorageGroupName();
@@ -94,6 +105,9 @@ public class CompactionScheduler {
             sequence
                 ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
                 : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     for (List<TsFileResource> task : taskList) {
       ICompactionPerformer performer =
           sequence
@@ -105,7 +119,7 @@ public class CompactionScheduler {
                   .getConfig()
                   .getInnerUnseqCompactionPerformer()
                   .createInstance();
-      CompactionTaskManager.getInstance()
+      if (CompactionTaskManager.getInstance()
           .addTaskToWaitingQueue(
               new InnerSpaceCompactionTask(
                   timePartition,
@@ -114,14 +128,17 @@ public class CompactionScheduler {
                   sequence,
                   performer,
                   CompactionTaskManager.currentTaskNum,
-                  tsFileManager.getNextCompactionTaskId()));
+                  tsFileManager.getNextCompactionTaskId()))) {
+        trySubmitCount++;
+      }
     }
+    return trySubmitCount;
   }
 
-  private static void tryToSubmitCrossSpaceCompactionTask(
+  private static int tryToSubmitCrossSpaceCompactionTask(
       TsFileManager tsFileManager, long timePartition) throws InterruptedException {
     if (!config.isEnableCrossSpaceCompaction()) {
-      return;
+      return 0;
     }
     String logicalStorageGroupName = tsFileManager.getStorageGroupName();
     String dataRegionId = tsFileManager.getDataRegionId();
@@ -137,8 +154,11 @@ public class CompactionScheduler {
         taskList.stream()
             .map(CrossCompactionTaskResource::getTotalMemoryCost)
             .collect(Collectors.toList());
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     for (int i = 0, size = taskList.size(); i < size; ++i) {
-      CompactionTaskManager.getInstance()
+      if (CompactionTaskManager.getInstance()
           .addTaskToWaitingQueue(
               new CrossSpaceCompactionTask(
                   timePartition,
@@ -151,7 +171,10 @@ public class CompactionScheduler {
                       .createInstance(),
                   CompactionTaskManager.currentTaskNum,
                   memoryCost.get(i),
-                  tsFileManager.getNextCompactionTaskId()));
+                  tsFileManager.getNextCompactionTaskId()))) {
+        trySubmitCount++;
+      }
     }
+    return trySubmitCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 158cdaa34f6..024b02fc9bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -71,6 +71,7 @@ public class CompactionTaskManager implements IService {
   private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
 
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
+
   private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
       new FixedPriorityBlockingQueue<>(
           config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index a5a3d7e20be..4b4d4cbd393 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2187,17 +2187,21 @@ public class DataRegion implements IDataRegionForQuery {
     logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId);
   }
 
-  protected void executeCompaction() {
+  protected int executeCompaction() {
+    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+    // evicted due to the low priority of the task
+    int trySubmitCount = 0;
     try {
       List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
       // sort the time partition from largest to smallest
       timePartitions.sort(Comparator.reverseOrder());
       for (long timePartition : timePartitions) {
-        CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
+        trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
       }
     } catch (Throwable e) {
       logger.error("Meet error in compaction schedule.", e);
     }
+    return trySubmitCount;
   }
 
   /**
@@ -2328,10 +2332,10 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /** merge file under this database processor */
-  public void compact() {
+  public int compact() {
     writeLock("merge");
     try {
-      executeCompaction();
+      return executeCompaction();
     } finally {
       writeUnlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e0..c8ff4534099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -346,6 +346,7 @@ public class TsFileManager {
     readLock();
     try {
       Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet());
+      timePartitions.addAll(unsequenceFiles.keySet());
       return timePartitions;
     } finally {
       readUnlock();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
index 730a4e56795..7941628e8f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
@@ -20,14 +20,19 @@
 package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -65,4 +70,37 @@ public class SizeTieredCompactionSelectorTest {
             .selectInnerSpaceTask(manager.getOrCreateSequenceListByTimePartition(9))
             .size());
   }
+
+  @Test
+  public void testSubmitWhenSequenceFileIsEmpty() throws Exception {
+    DataRegion region = new DataRegion("root.test", "1");
+    TsFileManager manager = region.getTsFileManager();
+    int originCandidate =
+        IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+    IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(30);
+    boolean enableUnseqCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
+    CompactionTaskManager.getInstance().start();
+    try {
+      for (int i = 1; i < 91; ++i) {
+        TsFileResource resource = Mockito.mock(TsFileResource.class);
+        Mockito.when(resource.getTimePartition()).thenReturn(0L);
+        Mockito.when(resource.getTsFileSize()).thenReturn(100L);
+        Mockito.when(resource.getTsFile())
+            .thenReturn(new File(String.format("%d-%d-0-0.tsfile", i, i)));
+        Mockito.when(resource.getStatus()).thenReturn(TsFileResourceStatus.NORMAL);
+        manager.add(resource, false);
+      }
+      Assert.assertEquals(3, region.compact());
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setMaxInnerCompactionCandidateFileNum(originCandidate);
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setEnableUnseqSpaceCompaction(enableUnseqCompaction);
+      CompactionTaskManager.getInstance().shutdown(60_000L);
+    }
+  }
 }