You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/20 02:44:28 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new d85111fa1e4 [To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)
d85111fa1e4 is described below
commit d85111fa1e4ccc56f4b3c185e21fc223cbed87e3
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sat May 20 10:44:21 2023 +0800
[To rel/1.1] [IOTDB-5903] Fix cannot select any inner space compaction task when there is only unsequence data (#9892)
---
.../compaction/schedule/CompactionScheduler.java | 49 ++++++++++++++++------
.../compaction/schedule/CompactionTaskManager.java | 1 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 12 ++++--
.../db/engine/storagegroup/TsFileManager.java | 1 +
.../SizeTieredCompactionSelectorTest.java | 38 +++++++++++++++++
5 files changed, 84 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 97a12af6707..5c3f1f80269 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -52,26 +52,37 @@ public class CompactionScheduler {
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
+ /**
+ * Select compaction task and submit them to CompactionTaskManager.
+ *
+ * @param tsFileManager tsfileManager that contains source files
+ * @param timePartition the time partition to execute the selection
+ * @return the count of submitted task
+ */
+ public static int scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
if (!tsFileManager.isAllowCompaction()) {
- return;
+ return 0;
}
+ // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+ // evicted due to the low priority of the task
+ int trySubmitCount = 0;
try {
- tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
- tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
- tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
+ trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+ trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
+ trySubmitCount += tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
} catch (InterruptedException e) {
LOGGER.error("Exception occurs when selecting compaction tasks", e);
Thread.currentThread().interrupt();
}
+ return trySubmitCount;
}
- public static void tryToSubmitInnerSpaceCompactionTask(
+ public static int tryToSubmitInnerSpaceCompactionTask(
TsFileManager tsFileManager, long timePartition, boolean sequence)
throws InterruptedException {
if ((!config.isEnableSeqSpaceCompaction() && sequence)
|| (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
- return;
+ return 0;
}
String storageGroupName = tsFileManager.getStorageGroupName();
@@ -94,6 +105,9 @@ public class CompactionScheduler {
sequence
? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
: tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+ // evicted due to the low priority of the task
+ int trySubmitCount = 0;
for (List<TsFileResource> task : taskList) {
ICompactionPerformer performer =
sequence
@@ -105,7 +119,7 @@ public class CompactionScheduler {
.getConfig()
.getInnerUnseqCompactionPerformer()
.createInstance();
- CompactionTaskManager.getInstance()
+ if (CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
new InnerSpaceCompactionTask(
timePartition,
@@ -114,14 +128,17 @@ public class CompactionScheduler {
sequence,
performer,
CompactionTaskManager.currentTaskNum,
- tsFileManager.getNextCompactionTaskId()));
+ tsFileManager.getNextCompactionTaskId()))) {
+ trySubmitCount++;
+ }
}
+ return trySubmitCount;
}
- private static void tryToSubmitCrossSpaceCompactionTask(
+ private static int tryToSubmitCrossSpaceCompactionTask(
TsFileManager tsFileManager, long timePartition) throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
- return;
+ return 0;
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
@@ -137,8 +154,11 @@ public class CompactionScheduler {
taskList.stream()
.map(CrossCompactionTaskResource::getTotalMemoryCost)
.collect(Collectors.toList());
+ // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+ // evicted due to the low priority of the task
+ int trySubmitCount = 0;
for (int i = 0, size = taskList.size(); i < size; ++i) {
- CompactionTaskManager.getInstance()
+ if (CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
new CrossSpaceCompactionTask(
timePartition,
@@ -151,7 +171,10 @@ public class CompactionScheduler {
.createInstance(),
CompactionTaskManager.currentTaskNum,
memoryCost.get(i),
- tsFileManager.getNextCompactionTaskId()));
+ tsFileManager.getNextCompactionTaskId()))) {
+ trySubmitCount++;
+ }
}
+ return trySubmitCount;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 158cdaa34f6..024b02fc9bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -71,6 +71,7 @@ public class CompactionTaskManager implements IService {
private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
+
private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
new FixedPriorityBlockingQueue<>(
config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 42772794dfd..7127c2a96c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2147,17 +2147,21 @@ public class DataRegion implements IDataRegionForQuery {
logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId);
}
- protected void executeCompaction() {
+ protected int executeCompaction() {
+ // the name of this variable is trySubmitCount, because the task submitted to the queue could be
+ // evicted due to the low priority of the task
+ int trySubmitCount = 0;
try {
List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
// sort the time partition from largest to smallest
timePartitions.sort(Comparator.reverseOrder());
for (long timePartition : timePartitions) {
- CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
+ trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
}
} catch (Throwable e) {
logger.error("Meet error in compaction schedule.", e);
}
+ return trySubmitCount;
}
/**
@@ -2288,10 +2292,10 @@ public class DataRegion implements IDataRegionForQuery {
}
/** merge file under this database processor */
- public void compact() {
+ public int compact() {
writeLock("merge");
try {
- executeCompaction();
+ return executeCompaction();
} finally {
writeUnlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 86fa5e742e0..c8ff4534099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -346,6 +346,7 @@ public class TsFileManager {
readLock();
try {
Set<Long> timePartitions = new HashSet<>(sequenceFiles.keySet());
+ timePartitions.addAll(unsequenceFiles.keySet());
return timePartitions;
} finally {
readUnlock();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
index 730a4e56795..7941628e8f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelectorTest.java
@@ -20,14 +20,19 @@
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.selector.impl.SizeTieredCompactionSelector;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -65,4 +70,37 @@ public class SizeTieredCompactionSelectorTest {
.selectInnerSpaceTask(manager.getOrCreateSequenceListByTimePartition(9))
.size());
}
+
+ @Test
+ public void testSubmitWhenSequenceFileIsEmpty() throws Exception {
+ DataRegion region = new DataRegion("root.test", "1");
+ TsFileManager manager = region.getTsFileManager();
+ int originCandidate =
+ IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+ IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(30);
+ boolean enableUnseqCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
+ CompactionTaskManager.getInstance().start();
+ try {
+ for (int i = 1; i < 91; ++i) {
+ TsFileResource resource = Mockito.mock(TsFileResource.class);
+ Mockito.when(resource.getTimePartition()).thenReturn(0L);
+ Mockito.when(resource.getTsFileSize()).thenReturn(100L);
+ Mockito.when(resource.getTsFile())
+ .thenReturn(new File(String.format("%d-%d-0-0.tsfile", i, i)));
+ Mockito.when(resource.getStatus()).thenReturn(TsFileResourceStatus.NORMAL);
+ manager.add(resource, false);
+ }
+ Assert.assertEquals(3, region.compact());
+ } finally {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMaxInnerCompactionCandidateFileNum(originCandidate);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqCompaction);
+ CompactionTaskManager.getInstance().shutdown(60_000L);
+ }
+ }
}