You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/26 08:39:51 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 0ca58726df [To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)
0ca58726df is described below
commit 0ca58726dfe8d174bd75a42d4a328d8e8214d141
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Jul 26 16:39:44 2022 +0800
[To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)
---
.../storagegroup/VirtualStorageGroupProcessor.java | 133 +++++++++++++--------
.../storagegroup/StorageGroupProcessorTest.java | 87 +++++++++++++-
2 files changed, 165 insertions(+), 55 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index db39f15ce0..583bc30b32 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -1853,7 +1853,7 @@ public class VirtualStorageGroupProcessor {
// record files which are updated so that we can roll back them in case of exception
List<ModificationFile> updatedModFiles = new ArrayList<>();
-
+ boolean hasReleasedLock = false;
try {
Set<PartialPath> devicePaths = IoTDB.metaManager.getBelongedDevices(path);
for (PartialPath device : devicePaths) {
@@ -1866,20 +1866,18 @@ public class VirtualStorageGroupProcessor {
Deletion deletion = new Deletion(path, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
+ List<TsFileResource> sealedTsFileResource = new ArrayList<>();
+ List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
+ separateTsFile(sealedTsFileResource, unsealedTsFileResource);
+
deleteDataInFiles(
- tsFileManager.getTsFileList(true),
- deletion,
- devicePaths,
- updatedModFiles,
- planIndex,
- timePartitionFilter);
+ unsealedTsFileResource, deletion, devicePaths, updatedModFiles, timePartitionFilter);
+
+ writeUnlock();
+ hasReleasedLock = true;
+
deleteDataInFiles(
- tsFileManager.getTsFileList(false),
- deletion,
- devicePaths,
- updatedModFiles,
- planIndex,
- timePartitionFilter);
+ sealedTsFileResource, deletion, devicePaths, updatedModFiles, timePartitionFilter);
} catch (Exception e) {
// roll back
@@ -1890,10 +1888,37 @@ public class VirtualStorageGroupProcessor {
}
throw new IOException(e);
} finally {
- writeUnlock();
+ if (!hasReleasedLock) {
+ writeUnlock();
+ }
}
}
+ /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+ private void separateTsFile(
+ List<TsFileResource> sealedResource, List<TsFileResource> unsealedResource) {
+ tsFileManager
+ .getTsFileList(true)
+ .forEach(
+ tsFileResource -> {
+ if (tsFileResource.isClosed()) {
+ sealedResource.add(tsFileResource);
+ } else {
+ unsealedResource.add(tsFileResource);
+ }
+ });
+ tsFileManager
+ .getTsFileList(false)
+ .forEach(
+ tsFileResource -> {
+ if (tsFileResource.isClosed()) {
+ sealedResource.add(tsFileResource);
+ } else {
+ unsealedResource.add(tsFileResource);
+ }
+ });
+ }
+
private void logDeletion(
long startTime, long endTime, PartialPath path, TimePartitionFilter timePartitionFilter)
throws IOException {
@@ -1932,10 +1957,7 @@ public class VirtualStorageGroupProcessor {
logicalStorageGroupName, tsFileResource.getTimePartition())) {
return true;
}
- if (!tsFileResource.isClosed()) {
- // tsfile is not closed
- return false;
- }
+
for (PartialPath device : devicePaths) {
String deviceId = device.getFullPath();
if (!tsFileResource.mayContainsDevice(deviceId)) {
@@ -1943,10 +1965,18 @@ public class VirtualStorageGroupProcessor {
continue;
}
- if (deleteEnd >= tsFileResource.getStartTime(deviceId)
- && deleteStart <= tsFileResource.getEndTime(deviceId)) {
- // time range of device has overlap with the deletion
- return false;
+ long deviceEndTime = tsFileResource.getEndTime(deviceId);
+ if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
+ // unsealed seq file
+ if (deleteEnd >= tsFileResource.getStartTime(deviceId)) {
+ return false;
+ }
+ } else {
+ // sealed file or unsealed unseq file
+ if (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart <= deviceEndTime) {
+ // time range of device has overlap with the deletion
+ return false;
+ }
}
}
return true;
@@ -1957,7 +1987,6 @@ public class VirtualStorageGroupProcessor {
Deletion deletion,
Set<PartialPath> devicePaths,
List<ModificationFile> updatedModFiles,
- long planIndex,
TimePartitionFilter timePartitionFilter)
throws IOException {
for (TsFileResource tsFileResource : tsFileResourceList) {
@@ -1970,35 +1999,35 @@ public class VirtualStorageGroupProcessor {
continue;
}
- if (tsFileResource.isCompacting()) {
- // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
- // change after compaction
- deletion.setFileOffset(Long.MAX_VALUE);
- // write deletion into compaction modification file
- tsFileResource.getCompactionModFile().write(deletion);
- // write deletion into modification file to enable query during compaction
- tsFileResource.getModFile().write(deletion);
- // remember to close mod file
- tsFileResource.getCompactionModFile().close();
- tsFileResource.getModFile().close();
- } else if (tsFileResource.isClosed()) {
- deletion.setFileOffset(tsFileResource.getTsFileSize());
- // write deletion into modification file
- tsFileResource.getModFile().write(deletion);
- // remember to close mod file
- tsFileResource.getModFile().close();
- }
- logger.info(
- "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
- deletion.getPath(),
- deletion.getStartTime(),
- deletion.getEndTime(),
- tsFileResource.getModFile().getFilePath());
-
- // delete data in memory of unsealed file
- if (!tsFileResource.isClosed()) {
- TsFileProcessor tsfileProcessor = tsFileResource.getProcessor();
- tsfileProcessor.deleteDataInMemory(deletion, devicePaths);
+ if (tsFileResource.isClosed()) {
+ // delete data in sealed file
+ if (tsFileResource.isCompacting()) {
+ // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
+ // change after compaction
+ deletion.setFileOffset(Long.MAX_VALUE);
+ // write deletion into compaction modification file
+ tsFileResource.getCompactionModFile().write(deletion);
+ // write deletion into modification file to enable query during compaction
+ tsFileResource.getModFile().write(deletion);
+ // remember to close mod file
+ tsFileResource.getCompactionModFile().close();
+ tsFileResource.getModFile().close();
+ } else {
+ deletion.setFileOffset(tsFileResource.getTsFileSize());
+ // write deletion into modification file
+ tsFileResource.getModFile().write(deletion);
+ // remember to close mod file
+ tsFileResource.getModFile().close();
+ }
+ logger.info(
+ "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
+ deletion.getPath(),
+ deletion.getStartTime(),
+ deletion.getEndTime(),
+ tsFileResource.getModFile().getFilePath());
+ } else {
+ // delete data in memory of unsealed file
+ tsFileResource.getProcessor().deleteDataInMemory(deletion, devicePaths);
}
// add a record in case of rollback
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 75af3139a8..ad3ed9517b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -925,7 +925,7 @@ public class StorageGroupProcessorTest {
}
@Test
- public void testDeleteDataInFlushingMemtable()
+ public void testDeleteDataNotInFlushingMemtable()
throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
for (int j = 0; j < 100; j++) {
TSRecord record = new TSRecord(j, deviceId);
@@ -936,15 +936,96 @@ public class StorageGroupProcessorTest {
TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
- // delete data which is in memtable
+ // delete data which is not in memtable
processor.delete(new PartialPath("root.vehicle.d2.s0"), 50, 70, 0, null);
// delete data which is not in memtable
processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+ processor.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+ }
+
+ @Test
+ public void testDeleteDataInSeqFlushingMemtable()
+ throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ TsFileResource tsFileResource = processor.getTsFileResourceManager().getTsFileList(true).get(0);
+ TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+ tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
+
+ // delete data which is not in flushing memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+ // delete data which is in flushing memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+
+ processor.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertTrue(tsFileResource.getModFile().exists());
+ Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
+ }
+
+ @Test
+ public void testDeleteDataInUnSeqFlushingMemtable()
+ throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ TsFileResource tsFileResource = processor.getTsFileResourceManager().getTsFileList(true).get(0);
+
+ // delete data which is not in work memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+ // delete data which is in work memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+
+ processor.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+
+ // insert unseq data points
+ for (int j = 50; j < 100; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ // delete data which is not in work memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 200, 299, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+ // delete data which is in work memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 80, 85, 0, null);
+
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+
+ tsFileResource = processor.getTsFileResourceManager().getTsFileList(false).get(0);
+ TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+ tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
+
+ // delete data which is not in flushing memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 0, 49, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+ // delete data which is in flushing memtable
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 25, 50, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 80, 0, null);
+ processor.delete(new PartialPath("root.vehicle.d0.s0"), 99, 150, 0, null);
+
processor.syncCloseAllWorkingTsFileProcessors();
Assert.assertTrue(tsFileResource.getModFile().exists());
- Assert.assertEquals(2, tsFileResource.getModFile().getModifications().size());
+ Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
}
class DummySGP extends VirtualStorageGroupProcessor {