You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/01 14:58:45 UTC
[iotdb] branch master updated: [IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new be883fd [IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)
be883fd is described below
commit be883fd8ccca035e627317ba75839041109fcf47
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Fri Apr 1 22:56:46 2022 +0800
[IOTDB-2818] Fix exception occurs when submitting compaction task to global queue (#5384)
---
.../manage/CrossSpaceCompactionResource.java | 3 ++-
.../sizetiered/SizeTieredCompactionSelector.java | 4 +--
.../db/engine/storagegroup/TsFileResource.java | 4 +++
.../inner/InnerCompactionSchedulerTest.java | 31 ++++++++++++++++++++++
4 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index 7ced2fb..fecdd78 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -80,7 +81,7 @@ public class CrossSpaceCompactionResource {
*/
private void filterUnseqResource(List<TsFileResource> unseqResources) {
for (TsFileResource resource : unseqResources) {
- if (resource.isCompacting() || resource.isCompactionCandidate() || !resource.isClosed()) {
+ if (resource.getStatus() != TsFileResourceStatus.CLOSED) {
return;
} else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
this.unseqFiles.add(resource);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 183e8da..8783aec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -124,8 +125,7 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level
- || currentFile.isCompactionCandidate()
- || currentFile.isCompacting()) {
+ || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
selectedFileList.clear();
selectedFileSize = 0L;
continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index a8dc5e5..6c9e83c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -626,6 +626,10 @@ public class TsFileResource {
}
}
+ public TsFileResourceStatus getStatus() {
+ return this.status;
+ }
+
/**
* check if any of the device lives over the given time bound. If the file is not closed, then
* return true.
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 2b05370..0208d32 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -134,4 +134,35 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
}
Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
}
+
+ @Test
+ public void testFileSelectorWithUnclosedFile()
+ throws IOException, MetadataException, WriteProcessException {
+ IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
+ IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(50);
+ TsFileResourceList tsFileResources = new TsFileResourceList();
+ createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+ createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+ seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
+ TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
+ tsFileManager.addAll(seqResources, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(
+ "testSG", "0", 0L, tsFileManager, true, new FakedInnerSpaceCompactionTaskFactory());
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
+
+ long waitingTime = 0;
+ while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
+ try {
+ Thread.sleep(100);
+ waitingTime += 100;
+ if (waitingTime > MAX_WAITING_TIME) {
+ Assert.fail();
+ break;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
+ }
}