You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/03 08:47:26 UTC
[iotdb] branch master updated: [IOTDB-2811] Fix compaction exception handle failure cause by deletion of storage group (#5363)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 90e381d [IOTDB-2811] Fix compaction exception handle failure cause by deletion of storage group (#5363)
90e381d is described below
commit 90e381d7c8c4efbfda4e5eecc4c5590e5ecbd65e
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sun Apr 3 01:46:29 2022 -0700
[IOTDB-2811] Fix compaction exception handle failure cause by deletion of storage group (#5363)
---
.../cluster/log/snapshot/FileSnapshotTest.java | 10 +--
.../log/snapshot/PartitionedSnapshotTest.java | 4 +-
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 2 +-
.../cluster/server/member/DataGroupMemberTest.java | 2 +-
.../iotdb/db/integration/IoTDBFilePathUtilsIT.java | 2 +-
.../db/integration/IoTDBLoadExternalTsfileIT.java | 70 ++++++++--------
.../integration/IoTDBManageTsFileResourceIT.java | 8 +-
.../aligned/IoTDBLoadExternalAlignedTsFileIT.java | 64 +++++++--------
.../org/apache/iotdb/db/engine/StorageEngine.java | 12 ++-
.../engine/compaction/CompactionTaskManager.java | 95 ++++++++++++++--------
.../cross/AbstractCrossSpaceCompactionTask.java | 9 ++
.../task/RewriteCrossSpaceCompactionTask.java | 4 +
.../inner/AbstractInnerSpaceCompactionTask.java | 9 ++
.../inner/sizetiered/SizeTieredCompactionTask.java | 6 +-
.../compaction/task/AbstractCompactionTask.java | 20 ++++-
.../db/engine/storagegroup/TsFileResourceList.java | 5 +-
.../storagegroup/VirtualStorageGroupProcessor.java | 40 +++++++--
.../virtualSg/StorageGroupManager.java | 14 +++-
.../storagegroup/StorageGroupProcessorTest.java | 58 +++++++++++--
.../iotdb/db/engine/storagegroup/TTLTest.java | 4 +-
.../db/metadata/idtable/IDTableFlushTimeTest.java | 6 +-
.../db/sync/receiver/load/FileLoaderTest.java | 14 ++--
.../recover/SyncReceiverLogAnalyzerTest.java | 2 +-
23 files changed, 313 insertions(+), 147 deletions(-)
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
index 8919016..d43258b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java
@@ -120,7 +120,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
@@ -164,7 +164,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
@@ -214,7 +214,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
@@ -254,7 +254,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(j)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(10, loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
@@ -300,7 +300,7 @@ public class FileSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(10, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 7ff6c58..77ebfcd 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -113,7 +113,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
assertEquals(i, loadedFiles.get(i).getMaxPlanIndex());
@@ -190,7 +190,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(-1, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(0, loadedFiles.size());
assertEquals(0, processor.getUnSequenceFileList().size());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index 6701148..603f85f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -295,7 +295,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertEquals(9, processor.getPartitionMaxFileVersions(0));
- List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ List<TsFileResource> loadedFiles = processor.getSequenceFileList();
assertEquals(tsFileResources.size(), loadedFiles.size());
for (int i = 0; i < 9; i++) {
if (i != loadedFiles.get(i).getMaxPlanIndex()) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 4977b89..c82800f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -530,7 +530,7 @@ public class DataGroupMemberTest extends BaseMember {
processor.insert(insertPlan);
snapshot.getDefaultInstaller(dataGroupMember).install(snapshot, 0, false);
- assertEquals(2, processor.getSequenceFileTreeSet().size());
+ assertEquals(2, processor.getSequenceFileList().size());
assertEquals(1, processor.getUnSequenceFileList().size());
Deletion deletion = new Deletion(new PartialPath(TestUtils.getTestSg(0)), 0, 0);
assertTrue(
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFilePathUtilsIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFilePathUtilsIT.java
index d8e57b7..ee7b633 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFilePathUtilsIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFilePathUtilsIT.java
@@ -82,7 +82,7 @@ public class IoTDBFilePathUtilsIT {
}
Assert.assertNotNull(sgPath);
List<TsFileResource> tsFileResources =
- StorageEngine.getInstance().getProcessor(sgPath).getSequenceFileTreeSet();
+ StorageEngine.getInstance().getProcessor(sgPath).getSequenceFileList();
Assert.assertNotNull(tsFileResources);
for (TsFileResource tsFileResource : tsFileResources) {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3f6bb50..f415e25 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -44,7 +44,11 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -163,7 +167,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
File tmpDir =
new File(
@@ -179,7 +183,7 @@ public class IoTDBLoadExternalTsfileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertNotNull(tmpDir.listFiles());
assertEquals(1, tmpDir.listFiles().length >> 1);
@@ -189,7 +193,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir =
new File(
@@ -207,7 +211,7 @@ public class IoTDBLoadExternalTsfileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertNotNull(tmpDir.listFiles());
assertEquals(2, tmpDir.listFiles().length >> 1);
@@ -227,7 +231,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
File tmpDir =
new File(
resources
@@ -256,7 +260,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
tmpDir =
new File(
resources
@@ -296,13 +300,13 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
assertNotNull(tmpDir.listFiles());
assertEquals(
@@ -395,7 +399,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File tmpDir =
new File(
@@ -422,7 +426,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
@@ -448,7 +452,7 @@ public class IoTDBLoadExternalTsfileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertEquals(
1,
@@ -466,14 +470,14 @@ public class IoTDBLoadExternalTsfileIT {
3,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} else {
assertEquals(
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
} else if (config.getTimeIndexLevel().equals(TimeIndexLevel.FILE_TIME_INDEX)) {
@@ -487,7 +491,7 @@ public class IoTDBLoadExternalTsfileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
assertNotNull(tmpDir.listFiles());
@@ -596,7 +600,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File tmpDir =
new File(
@@ -623,7 +627,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
@@ -649,7 +653,7 @@ public class IoTDBLoadExternalTsfileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertEquals(
1,
@@ -667,14 +671,14 @@ public class IoTDBLoadExternalTsfileIT {
3,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} else {
assertEquals(
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
} else if (config.getTimeIndexLevel().equals(TimeIndexLevel.FILE_TIME_INDEX)) {
@@ -688,7 +692,7 @@ public class IoTDBLoadExternalTsfileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
assertNotNull(tmpDir.listFiles());
@@ -742,7 +746,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
File tmpDir =
new File(
@@ -772,7 +776,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
tmpDir =
new File(
resources
@@ -838,13 +842,13 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
assertEquals(2, tmpDir.listFiles().length);
for (File dir : tmpDir.listFiles()) {
@@ -865,7 +869,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
File vehicleTmpDir =
new File(
@@ -890,7 +894,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File testTmpDir = new File(vehicleTmpDir.getParentFile(), "root.test");
@@ -941,7 +945,7 @@ public class IoTDBLoadExternalTsfileIT {
1,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
// load test
@@ -962,7 +966,7 @@ public class IoTDBLoadExternalTsfileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} catch (Exception e) {
@@ -980,7 +984,7 @@ public class IoTDBLoadExternalTsfileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
for (TsFileResource resource : resources) {
statement.execute(String.format("remove '%s'", resource.getTsFilePath()));
@@ -989,14 +993,14 @@ public class IoTDBLoadExternalTsfileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
for (TsFileResource resource : resources) {
statement.execute(String.format("remove '%s'", resource.getTsFilePath()));
@@ -1005,7 +1009,7 @@ public class IoTDBLoadExternalTsfileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} catch (StorageEngineException | IllegalPathException e) {
Assert.fail();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
index e945f7d..fe50b4f 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
@@ -120,7 +120,7 @@ public class IoTDBManageTsFileResourceIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.sg1"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(5, seqResources.size());
// five tsFileResource are degraded in total, 2 are in seqResources and 3 are in
// unSeqResources
@@ -184,7 +184,7 @@ public class IoTDBManageTsFileResourceIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.sg1"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
for (TsFileResource resource : resources) {
assertEquals(
@@ -213,7 +213,7 @@ public class IoTDBManageTsFileResourceIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.sg1"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(5, seqResources.size());
// Four tsFileResource are degraded in total, 1 are in seqResources and 3 are in
@@ -257,7 +257,7 @@ public class IoTDBManageTsFileResourceIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.sg1"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(5, seqResources.size());
for (int i = 0; i < seqResources.size(); i++) {
assertTrue(seqResources.get(i).isClosed());
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBLoadExternalAlignedTsFileIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBLoadExternalAlignedTsFileIT.java
index 9da1221..5b0715e 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBLoadExternalAlignedTsFileIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBLoadExternalAlignedTsFileIT.java
@@ -165,7 +165,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
File tmpDir =
new File(
@@ -181,7 +181,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertNotNull(tmpDir.listFiles());
assertEquals(1, tmpDir.listFiles().length >> 1);
@@ -191,7 +191,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir =
new File(
@@ -209,7 +209,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertNotNull(tmpDir.listFiles());
assertEquals(2, tmpDir.listFiles().length >> 1);
@@ -229,7 +229,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
File tmpDir =
new File(
resources
@@ -258,7 +258,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
tmpDir =
new File(
resources
@@ -298,13 +298,13 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
assertNotNull(tmpDir.listFiles());
assertEquals(
@@ -397,7 +397,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File tmpDir =
new File(
@@ -424,7 +424,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
@@ -450,7 +450,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertEquals(
1,
@@ -468,14 +468,14 @@ public class IoTDBLoadExternalAlignedTsFileIT {
3,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} else {
assertEquals(
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
} else if (config.getTimeIndexLevel().equals(TimeIndexLevel.FILE_TIME_INDEX)) {
@@ -489,7 +489,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
assertNotNull(tmpDir.listFiles());
@@ -598,7 +598,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File tmpDir =
new File(
@@ -625,7 +625,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
@@ -651,7 +651,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
assertEquals(
1,
@@ -669,14 +669,14 @@ public class IoTDBLoadExternalAlignedTsFileIT {
3,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} else {
assertEquals(
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
} else if (config.getTimeIndexLevel().equals(TimeIndexLevel.FILE_TIME_INDEX)) {
@@ -690,7 +690,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
}
assertNotNull(tmpDir.listFiles());
@@ -744,7 +744,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
File tmpDir =
new File(
@@ -774,7 +774,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
tmpDir =
new File(
resources
@@ -840,13 +840,13 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
assertEquals(2, tmpDir.listFiles().length);
for (File dir : tmpDir.listFiles()) {
@@ -867,7 +867,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
File vehicleTmpDir =
new File(
@@ -892,7 +892,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
File testTmpDir = new File(vehicleTmpDir.getParentFile(), "root.test");
@@ -941,7 +941,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
1,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
// load test
@@ -962,7 +962,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
2,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} catch (Exception e) {
@@ -980,7 +980,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(1, resources.size());
for (TsFileResource resource : resources) {
statement.execute(String.format("remove '%s'", resource.getTsFilePath()));
@@ -989,14 +989,14 @@ public class IoTDBLoadExternalAlignedTsFileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.vehicle"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
resources =
new ArrayList<>(
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet());
+ .getSequenceFileList());
assertEquals(2, resources.size());
for (TsFileResource resource : resources) {
statement.execute(String.format("remove '%s'", resource.getTsFilePath()));
@@ -1005,7 +1005,7 @@ public class IoTDBLoadExternalAlignedTsFileIT {
0,
StorageEngine.getInstance()
.getProcessor(new PartialPath("root.test"))
- .getSequenceFileTreeSet()
+ .getSequenceFileList()
.size());
} catch (StorageEngineException | IllegalPathException e) {
Assert.fail();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7c92e42..f90b394 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -867,7 +867,7 @@ public class StorageEngine implements IService {
if (!processorMap.containsKey(storageGroupPath)) {
return;
}
-
+ abortCompactionTaskForStorageGroup(storageGroupPath);
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
releaseWalDirectByteBufferPoolInOneStorageGroup(storageGroupPath);
StorageGroupManager storageGroupManager = processorMap.remove(storageGroupPath);
@@ -876,6 +876,16 @@ public class StorageEngine implements IService {
storageGroupManager.stopSchedulerPool();
}
+ private void abortCompactionTaskForStorageGroup(PartialPath storageGroupPath) {
+ if (!processorMap.containsKey(storageGroupPath)) {
+ return;
+ }
+
+ StorageGroupManager manager = processorMap.get(storageGroupPath);
+ manager.setAllowCompaction(false);
+ manager.abortCompaction();
+ }
+
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
getProcessorDirectly(new PartialPath(getSgByEngineFile(newTsFileResource.getTsFile(), false)))
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index b0ecbe5..7d7c9ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -36,13 +36,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -62,10 +58,10 @@ public class CompactionTaskManager implements IService {
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
private FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
- // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
- // logicalStorageGroup
- private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
- private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
+ // <fullStorageGroupName,futureSet>, it is used to store all compaction tasks under each
+ // virtualStorageGroup
+ private Map<String, Map<AbstractCompactionTask, Future<Void>>> storageGroupTasks =
+ new HashMap<>();
// The thread pool that periodically fetches and executes the compaction task from
// candidateCompactionTaskQueue to taskExecutionPool. The default number of threads for this pool
@@ -82,7 +78,7 @@ public class CompactionTaskManager implements IService {
}
@Override
- public void start() {
+ public synchronized void start() {
if (taskExecutionPool == null
&& IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() > 0) {
this.taskExecutionPool =
@@ -114,7 +110,7 @@ public class CompactionTaskManager implements IService {
}
@Override
- public void stop() {
+ public synchronized void stop() {
if (taskExecutionPool != null) {
taskExecutionPool.shutdownNow();
compactionTaskSubmissionThreadPool.shutdownNow();
@@ -126,7 +122,7 @@ public class CompactionTaskManager implements IService {
}
@Override
- public void waitAndStop(long milliseconds) {
+ public synchronized void waitAndStop(long milliseconds) {
if (taskExecutionPool != null) {
awaitTermination(taskExecutionPool, milliseconds);
awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
@@ -137,7 +133,7 @@ public class CompactionTaskManager implements IService {
}
@TestOnly
- public void waitAllCompactionFinish() {
+ public synchronized void waitAllCompactionFinish() {
long sleepingStartTime = 0;
long MAX_WAITING_TIME = 120_000L;
if (taskExecutionPool != null) {
@@ -163,7 +159,7 @@ public class CompactionTaskManager implements IService {
}
}
- private void waitTermination() {
+ private synchronized void waitTermination() {
long startTime = System.currentTimeMillis();
while (!taskExecutionPool.isTerminated()) {
int timeMillis = 0;
@@ -187,7 +183,7 @@ public class CompactionTaskManager implements IService {
logger.info("CompactionManager stopped");
}
- private void awaitTermination(ExecutorService service, long milliseconds) {
+ private synchronized void awaitTermination(ExecutorService service, long milliseconds) {
try {
service.shutdown();
service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS);
@@ -210,8 +206,7 @@ public class CompactionTaskManager implements IService {
*/
public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compactionTask)
throws InterruptedException {
- if (!candidateCompactionTaskQueue.contains(compactionTask)
- && !runningCompactionTaskList.contains(compactionTask)) {
+ if (!candidateCompactionTaskQueue.contains(compactionTask) && !isTaskRunning(compactionTask)) {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);
@@ -224,6 +219,13 @@ public class CompactionTaskManager implements IService {
return false;
}
+ private boolean isTaskRunning(AbstractCompactionTask task) {
+ String storageGroupName = task.getFullStorageGroupName();
+ return storageGroupTasks
+ .computeIfAbsent(storageGroupName, x -> new HashMap<>())
+ .containsKey(task);
+ }
+
/**
* This method will submit task cached in queue with most priority to execution thread pool if
* there is available thread.
@@ -241,9 +243,8 @@ public class CompactionTaskManager implements IService {
if (task != null && task.checkValidAndSetMerging()) {
submitTask(task);
- runningCompactionTaskList.add(task);
CompactionMetricsManager.recordTaskInfo(
- task, CompactionTaskStatus.READY_TO_EXECUTE, runningCompactionTaskList.size());
+ task, CompactionTaskStatus.READY_TO_EXECUTE, currentTaskNum.get());
}
}
} catch (InterruptedException e) {
@@ -279,10 +280,13 @@ public class CompactionTaskManager implements IService {
}
public synchronized void removeRunningTaskFromList(AbstractCompactionTask task) {
- runningCompactionTaskList.remove(task);
+ String storageGroupName = task.getFullStorageGroupName();
+ if (storageGroupTasks.containsKey(storageGroupName)) {
+ storageGroupTasks.get(storageGroupName).remove(task);
+ }
// add metrics
CompactionMetricsManager.recordTaskInfo(
- task, CompactionTaskStatus.FINISHED, runningCompactionTaskList.size());
+ task, CompactionTaskStatus.FINISHED, currentTaskNum.get());
}
/**
@@ -290,10 +294,13 @@ public class CompactionTaskManager implements IService {
*
* @throws RejectedExecutionException
*/
- public synchronized void submitTask(Callable<Void> compactionMergeTask)
+ public synchronized void submitTask(AbstractCompactionTask compactionTask)
throws RejectedExecutionException {
if (taskExecutionPool != null && !taskExecutionPool.isTerminated()) {
- taskExecutionPool.submit(compactionMergeTask);
+ Future<Void> future = taskExecutionPool.submit(compactionTask);
+ storageGroupTasks
+ .computeIfAbsent(compactionTask.getFullStorageGroupName(), x -> new HashMap<>())
+ .put(compactionTask, future);
return;
}
logger.warn(
@@ -304,21 +311,33 @@ public class CompactionTaskManager implements IService {
}
/**
- * Abort all compactions of a storage group. The caller must acquire the write lock of the
- * corresponding storage group.
+ * Abort all compactions of a storage group. The running compaction tasks will be returned as a
+ * list, the compaction threads for the storage group are not terminated util all the tasks in the
+ * list is finish. The outer caller can use function isAnyTaskInListStillRunning to determine
+ * this.
*/
- public void abortCompaction(String fullStorageGroupName) {
- Set<Future<Void>> subTasks =
- storageGroupTasks.getOrDefault(fullStorageGroupName, Collections.emptySet());
- candidateCompactionTaskQueue.clear();
- Iterator<Future<Void>> subIterator = subTasks.iterator();
- while (subIterator.hasNext()) {
- Future<Void> next = subIterator.next();
- if (!next.isDone() && !next.isCancelled()) {
- next.cancel(true);
+ public synchronized List<AbstractCompactionTask> abortCompaction(String storageGroupName) {
+ List<AbstractCompactionTask> compactionTaskOfCurSG = new ArrayList<>();
+ if (storageGroupTasks.containsKey(storageGroupName)) {
+ for (Map.Entry<AbstractCompactionTask, Future<Void>> taskFutureEntry :
+ storageGroupTasks.get(storageGroupName).entrySet()) {
+ taskFutureEntry.getValue().cancel(true);
+ compactionTaskOfCurSG.add(taskFutureEntry.getKey());
}
- subIterator.remove();
}
+
+ storageGroupTasks.remove(storageGroupName);
+
+ candidateCompactionTaskQueue.clear();
+ return compactionTaskOfCurSG;
+ }
+
+ public boolean isAnyTaskInListStillRunning(List<AbstractCompactionTask> compactionTasks) {
+ boolean anyTaskRunning = false;
+ for (AbstractCompactionTask task : compactionTasks) {
+ anyTaskRunning = anyTaskRunning || (task.isTaskRan() && !task.isTaskFinished());
+ }
+ return anyTaskRunning;
}
public int getExecutingTaskCount() {
@@ -330,7 +349,11 @@ public class CompactionTaskManager implements IService {
}
public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
- return new ArrayList<>(runningCompactionTaskList);
+ List<AbstractCompactionTask> tasks = new ArrayList<>();
+ for (Map<AbstractCompactionTask, Future<Void>> taskFutureMap : storageGroupTasks.values()) {
+ tasks.addAll(taskFutureMap.keySet());
+ }
+ return tasks;
}
public long getFinishTaskNum() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionTask.java
index d8b1bbd..633fb83 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionTask.java
@@ -60,6 +60,10 @@ public abstract class AbstractCrossSpaceCompactionTask extends AbstractCompactio
@Override
public boolean checkValidAndSetMerging() {
+ if (!tsFileManager.isAllowCompaction()) {
+ return false;
+ }
+
for (TsFileResource resource : selectedSequenceFiles) {
if (resource.isCompacting() || !resource.isClosed() || !resource.getTsFile().exists()) {
return false;
@@ -97,6 +101,11 @@ public abstract class AbstractCrossSpaceCompactionTask extends AbstractCompactio
}
@Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
public void resetCompactionCandidateStatusForAllSourceFiles() {
selectedSequenceFiles.forEach(x -> x.setStatus(TsFileResourceStatus.CLOSED));
selectedUnsequenceFiles.forEach(x -> x.setStatus(TsFileResourceStatus.CLOSED));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 9c1a1ac..5426bfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -147,10 +147,12 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
CompactionUtils.compact(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);
+ checkInterrupted();
CompactionUtils.moveTargetFile(targetTsfileResourceList, false, fullStorageGroupName);
CompactionUtils.combineModsInCompaction(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);
+ checkInterrupted();
// update tsfile resource in memory
tsFileManager.replace(
selectedSeqTsFileResourceList,
@@ -160,7 +162,9 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
true);
releaseReadAndLockWrite(selectedSeqTsFileResourceList);
+ checkInterrupted();
releaseReadAndLockWrite(selectedUnSeqTsFileResourceList);
+ checkInterrupted();
deleteOldFiles(selectedSeqTsFileResourceList);
deleteOldFiles(selectedUnSeqTsFileResourceList);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
index 4f952dc..8b60330 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
@@ -117,6 +117,10 @@ public abstract class AbstractInnerSpaceCompactionTask extends AbstractCompactio
@Override
public boolean checkValidAndSetMerging() {
+ if (!tsFileManager.isAllowCompaction()) {
+ return false;
+ }
+
for (TsFileResource resource : selectedTsFileResourceList) {
if (resource.isCompacting() | !resource.isClosed() || !resource.getTsFile().exists()) {
return false;
@@ -145,6 +149,11 @@ public abstract class AbstractInnerSpaceCompactionTask extends AbstractCompactio
}
@Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
public void resetCompactionCandidateStatusForAllSourceFiles() {
selectedTsFileResourceList.forEach(x -> x.setStatus(TsFileResourceStatus.CLOSED));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 6314eca..b19e43a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -130,10 +130,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
InnerSpaceCompactionUtils.combineModsInCompaction(
selectedTsFileResourceList, targetTsFileResource);
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", fullStorageGroupName));
- }
+ checkInterrupted();
// replace the old files with new file, the new is in same position as the old
if (sequence) {
@@ -162,6 +159,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
isHoldingReadLock[i] = false;
selectedTsFileResourceList.get(i).writeLock();
isHoldingWriteLock[i] = true;
+ checkInterrupted();
}
if (targetTsFileResource.getTsFile().length()
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index da46187..79d45df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -43,6 +43,8 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
protected final AtomicInteger currentTaskNum;
protected final TsFileManager tsFileManager;
protected long timeCost = 0L;
+ protected volatile boolean ran = false;
+ protected volatile boolean finished = false;
public AbstractCompactionTask(
String fullStorageGroupName,
@@ -61,6 +63,7 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
@Override
public Void call() throws Exception {
+ ran = true;
long startTime = System.currentTimeMillis();
currentTaskNum.incrementAndGet();
try {
@@ -68,9 +71,10 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
+ this.currentTaskNum.decrementAndGet();
CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
timeCost = System.currentTimeMillis() - startTime;
- this.currentTaskNum.decrementAndGet();
+ finished = true;
}
return null;
@@ -107,4 +111,18 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
public long getTimeCost() {
return timeCost;
}
+
+ protected void checkInterrupted() throws InterruptedException {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(String.format("%s [Compaction] abort", fullStorageGroupName));
+ }
+ }
+
+ public boolean isTaskRan() {
+ return ran;
+ }
+
+ public boolean isTaskFinished() {
+ return finished;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java
index 2f71a8e..091d8f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java
@@ -47,6 +47,9 @@ public class TsFileResourceList implements List<TsFileResource> {
* @param newNode the file to insert
*/
public void insertBefore(TsFileResource node, TsFileResource newNode) {
+ if (newNode.equals(node)) {
+ return;
+ }
newNode.prev = node.prev;
newNode.next = node;
if (node.prev == null) {
@@ -135,7 +138,7 @@ public class TsFileResourceList implements List<TsFileResource> {
* node's, the new node will be inserted to the tail of the list.
*/
public boolean keepOrderInsert(TsFileResource newNode) throws IOException {
- if (newNode.prev != null || newNode.next != null) {
+ if (newNode.prev != null || newNode.next != null || (count == 1 && header == newNode)) {
// this node already in a list
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 87d7b4e..afef443 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverManager;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
@@ -2887,7 +2888,7 @@ public class VirtualStorageGroupProcessor {
this.dataTTL = dataTTL;
}
- public List<TsFileResource> getSequenceFileTreeSet() {
+ public List<TsFileResource> getSequenceFileList() {
return tsFileManager.getTsFileList(true);
}
@@ -2922,7 +2923,7 @@ public class VirtualStorageGroupProcessor {
tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
- || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileTreeSet())
+ || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileList())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList());
}
@@ -2974,10 +2975,13 @@ public class VirtualStorageGroupProcessor {
// this requires blocking all other activities
writeLock("removePartitions");
try {
- tsFileManager.setAllowCompaction(false);
- // abort ongoing comapctions and merges
- CompactionTaskManager.getInstance()
- .abortCompaction(logicalStorageGroupName + "-" + virtualStorageGroupId);
+ // abort ongoing compaction
+ abortCompaction();
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Wait two seconds for the compaction thread to terminate
+ }
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet(), true);
removePartitions(filter, workUnsequenceTsFileProcessors.entrySet(), false);
@@ -2991,6 +2995,21 @@ public class VirtualStorageGroupProcessor {
}
}
+ public void abortCompaction() {
+ tsFileManager.setAllowCompaction(false);
+ List<AbstractCompactionTask> runningTasks =
+ CompactionTaskManager.getInstance()
+ .abortCompaction(logicalStorageGroupName + "-" + virtualStorageGroupId);
+ while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ logger.error("Thread get interrupted when waiting compaction to finish", e);
+ break;
+ }
+ }
+ }
+
// may remove the processorEntrys
private void removePartitions(
TimePartitionFilter filter,
@@ -3146,6 +3165,10 @@ public class VirtualStorageGroupProcessor {
this.customFlushListeners = customFlushListeners;
}
+ public void setAllowCompaction(boolean allowCompaction) {
+ this.tsFileManager.setAllowCompaction(allowCompaction);
+ }
+
private enum LoadTsFileType {
LOAD_SEQUENCE,
LOAD_UNSEQUENCE
@@ -3207,4 +3230,9 @@ public class VirtualStorageGroupProcessor {
public ILastFlushTimeManager getLastFlushTimeManager() {
return lastFlushTimeManager;
}
+
+ @TestOnly
+ public TsFileManager getTsFileManager() {
+ return tsFileManager;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index da482f9..3f895d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -417,7 +417,7 @@ public class StorageGroupManager {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
- List<TsFileResource> allResources = virtualStorageGroupProcessor.getSequenceFileTreeSet();
+ List<TsFileResource> allResources = virtualStorageGroupProcessor.getSequenceFileList();
allResources.addAll(virtualStorageGroupProcessor.getUnSequenceFileList());
for (TsFileResource tsfile : allResources) {
if (!tsfile.isClosed()) {
@@ -507,6 +507,18 @@ public class StorageGroupManager {
isSettling.set(settling);
}
+ public void setAllowCompaction(boolean allowCompaction) {
+ for (VirtualStorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ processor.setAllowCompaction(allowCompaction);
+ }
+ }
+
+ public void abortCompaction() {
+ for (VirtualStorageGroupProcessor processor : virtualStorageGroupProcessor) {
+ processor.abortCompaction();
+ }
+ }
+
public AtomicBoolean getIsSettling() {
return isSettling;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index f52b38d..1ce0722 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -60,10 +63,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
public class StorageGroupProcessorTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -706,6 +711,53 @@ public class StorageGroupProcessorTest {
}
@Test
+ public void testDeleteStorageGroupWhenCompacting() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(10);
+ try {
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ processor.asyncCloseAllWorkingTsFileProcessors();
+ }
+ processor.syncCloseAllWorkingTsFileProcessors();
+ SizeTieredCompactionTask task =
+ new SizeTieredCompactionTask(
+ storageGroup,
+ "0",
+ 0,
+ processor.getTsFileManager(),
+ processor.getSequenceFileList(),
+ true,
+ new AtomicInteger(0));
+ CompactionTaskManager.getInstance().submitTask(task);
+ Thread.sleep(20);
+ StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
+ Thread.sleep(500);
+
+ for (TsFileResource resource : processor.getSequenceFileList()) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ }
+ TsFileResource targetTsFileResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(
+ processor.getSequenceFileList(), true);
+ Assert.assertFalse(targetTsFileResource.getTsFile().exists());
+ String dataDirectory = targetTsFileResource.getTsFile().getParent();
+ File logFile =
+ new File(
+ dataDirectory
+ + File.separator
+ + targetTsFileResource.getTsFile().getName()
+ + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
+ Assert.assertFalse(logFile.exists());
+ Assert.assertFalse(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+ Assert.assertTrue(processor.getTsFileManager().isAllowCompaction());
+ } finally {
+ new CompactionConfigRestorer().restoreCompactionConfig();
+ }
+ }
+
+ @Test
public void testTimedFlushSeqMemTable()
throws IllegalPathException, InterruptedException, WriteProcessException,
TriggerExecutionException, ShutdownException {
@@ -874,11 +926,7 @@ public class StorageGroupProcessorTest {
class DummySGP extends VirtualStorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
- super(
- systemInfoDir,
- storageGroupName,
- new TsFileFlushPolicy.DirectFlushPolicy(),
- storageGroupName);
+ super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index a87b895..29f6b78 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -435,13 +435,13 @@ public class TTLTest {
prepareData();
virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
- assertEquals(4, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(4, virtualStorageGroupProcessor.getSequenceFileList().size());
assertEquals(4, virtualStorageGroupProcessor.getUnSequenceFileList().size());
virtualStorageGroupProcessor.setDataTTL(0);
virtualStorageGroupProcessor.checkFilesTTL();
- assertEquals(0, virtualStorageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(0, virtualStorageGroupProcessor.getSequenceFileList().size());
assertEquals(0, virtualStorageGroupProcessor.getUnSequenceFileList().size());
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
index 4849265..55cfb21 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
@@ -97,7 +97,7 @@ public class IDTableFlushTimeTest {
VirtualStorageGroupProcessor storageGroupProcessor =
StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
- assertEquals(2, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
}
@@ -112,7 +112,7 @@ public class IDTableFlushTimeTest {
VirtualStorageGroupProcessor storageGroupProcessor =
StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
- assertEquals(1, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
}
@@ -141,7 +141,7 @@ public class IDTableFlushTimeTest {
VirtualStorageGroupProcessor storageGroupProcessor =
StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
- assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 0b669d9..e8786f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -166,7 +166,7 @@ public class FileLoaderTest {
for (int i = 0; i < 3; i++) {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
- assertTrue(processor.getSequenceFileTreeSet().isEmpty());
+ assertTrue(processor.getSequenceFileList().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -200,8 +200,8 @@ public class FileLoaderTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
- assertEquals(10, processor.getSequenceFileTreeSet().size());
- for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
+ assertEquals(10, processor.getSequenceFileList().size());
+ for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getTsFile().getAbsolutePath());
}
assertTrue(processor.getUnSequenceFileList().isEmpty());
@@ -285,7 +285,7 @@ public class FileLoaderTest {
for (int i = 0; i < 3; i++) {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
- assertTrue(processor.getSequenceFileTreeSet().isEmpty());
+ assertTrue(processor.getSequenceFileList().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}
@@ -319,8 +319,8 @@ public class FileLoaderTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
- assertEquals(25, processor.getSequenceFileTreeSet().size());
- for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
+ assertEquals(25, processor.getSequenceFileList().size());
+ for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
loadedFileMap.get(SG_NAME + i).add(tsFileResource.getTsFile().getAbsolutePath());
}
assertTrue(processor.getUnSequenceFileList().isEmpty());
@@ -377,7 +377,7 @@ public class FileLoaderTest {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
- for (TsFileResource tsFileResource : processor.getSequenceFileTreeSet()) {
+ for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
loadedFileMap.get(SG_NAME + i).add(tsFileResource.getTsFile().getAbsolutePath());
}
assertTrue(processor.getUnSequenceFileList().isEmpty());
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
index 550eb7f..d6cf387 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
@@ -156,7 +156,7 @@ public class SyncReceiverLogAnalyzerTest {
for (int i = 0; i < 3; i++) {
VirtualStorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME + i));
- assertTrue(processor.getSequenceFileTreeSet().isEmpty());
+ assertTrue(processor.getSequenceFileList().isEmpty());
assertTrue(processor.getUnSequenceFileList().isEmpty());
}