You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/26 08:39:51 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 0ca58726df [To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)
0ca58726df is described below

commit 0ca58726dfe8d174bd75a42d4a328d8e8214d141
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Jul 26 16:39:44 2022 +0800

    [To rel/0.13][IOTDB-3777]Avoid holding sg write lock in the whole processin of deletion (#6745)
---
 .../storagegroup/VirtualStorageGroupProcessor.java | 133 +++++++++++++--------
 .../storagegroup/StorageGroupProcessorTest.java    |  87 +++++++++++++-
 2 files changed, 165 insertions(+), 55 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index db39f15ce0..583bc30b32 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -1853,7 +1853,7 @@ public class VirtualStorageGroupProcessor {
 
     // record files which are updated so that we can roll back them in case of exception
     List<ModificationFile> updatedModFiles = new ArrayList<>();
-
+    boolean hasReleasedLock = false;
     try {
       Set<PartialPath> devicePaths = IoTDB.metaManager.getBelongedDevices(path);
       for (PartialPath device : devicePaths) {
@@ -1866,20 +1866,18 @@ public class VirtualStorageGroupProcessor {
 
       Deletion deletion = new Deletion(path, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
 
+      List<TsFileResource> sealedTsFileResource = new ArrayList<>();
+      List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
+      separateTsFile(sealedTsFileResource, unsealedTsFileResource);
+
       deleteDataInFiles(
-          tsFileManager.getTsFileList(true),
-          deletion,
-          devicePaths,
-          updatedModFiles,
-          planIndex,
-          timePartitionFilter);
+          unsealedTsFileResource, deletion, devicePaths, updatedModFiles, timePartitionFilter);
+
+      writeUnlock();
+      hasReleasedLock = true;
+
       deleteDataInFiles(
-          tsFileManager.getTsFileList(false),
-          deletion,
-          devicePaths,
-          updatedModFiles,
-          planIndex,
-          timePartitionFilter);
+          sealedTsFileResource, deletion, devicePaths, updatedModFiles, timePartitionFilter);
 
     } catch (Exception e) {
       // roll back
@@ -1890,10 +1888,37 @@ public class VirtualStorageGroupProcessor {
       }
       throw new IOException(e);
     } finally {
-      writeUnlock();
+      if (!hasReleasedLock) {
+        writeUnlock();
+      }
     }
   }
 
+  /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+  private void separateTsFile(
+      List<TsFileResource> sealedResource, List<TsFileResource> unsealedResource) {
+    tsFileManager
+        .getTsFileList(true)
+        .forEach(
+            tsFileResource -> {
+              if (tsFileResource.isClosed()) {
+                sealedResource.add(tsFileResource);
+              } else {
+                unsealedResource.add(tsFileResource);
+              }
+            });
+    tsFileManager
+        .getTsFileList(false)
+        .forEach(
+            tsFileResource -> {
+              if (tsFileResource.isClosed()) {
+                sealedResource.add(tsFileResource);
+              } else {
+                unsealedResource.add(tsFileResource);
+              }
+            });
+  }
+
   private void logDeletion(
       long startTime, long endTime, PartialPath path, TimePartitionFilter timePartitionFilter)
       throws IOException {
@@ -1932,10 +1957,7 @@ public class VirtualStorageGroupProcessor {
             logicalStorageGroupName, tsFileResource.getTimePartition())) {
       return true;
     }
-    if (!tsFileResource.isClosed()) {
-      // tsfile is not closed
-      return false;
-    }
+
     for (PartialPath device : devicePaths) {
       String deviceId = device.getFullPath();
       if (!tsFileResource.mayContainsDevice(deviceId)) {
@@ -1943,10 +1965,18 @@ public class VirtualStorageGroupProcessor {
         continue;
       }
 
-      if (deleteEnd >= tsFileResource.getStartTime(deviceId)
-          && deleteStart <= tsFileResource.getEndTime(deviceId)) {
-        // time range of device has overlap with the deletion
-        return false;
+      long deviceEndTime = tsFileResource.getEndTime(deviceId);
+      if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
+        // unsealed seq file
+        if (deleteEnd >= tsFileResource.getStartTime(deviceId)) {
+          return false;
+        }
+      } else {
+        // sealed file or unsealed unseq file
+        if (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart <= deviceEndTime) {
+          // time range of device has overlap with the deletion
+          return false;
+        }
       }
     }
     return true;
@@ -1957,7 +1987,6 @@ public class VirtualStorageGroupProcessor {
       Deletion deletion,
       Set<PartialPath> devicePaths,
       List<ModificationFile> updatedModFiles,
-      long planIndex,
       TimePartitionFilter timePartitionFilter)
       throws IOException {
     for (TsFileResource tsFileResource : tsFileResourceList) {
@@ -1970,35 +1999,35 @@ public class VirtualStorageGroupProcessor {
         continue;
       }
 
-      if (tsFileResource.isCompacting()) {
-        // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
-        // change after compaction
-        deletion.setFileOffset(Long.MAX_VALUE);
-        // write deletion into compaction modification file
-        tsFileResource.getCompactionModFile().write(deletion);
-        // write deletion into modification file to enable query during compaction
-        tsFileResource.getModFile().write(deletion);
-        // remember to close mod file
-        tsFileResource.getCompactionModFile().close();
-        tsFileResource.getModFile().close();
-      } else if (tsFileResource.isClosed()) {
-        deletion.setFileOffset(tsFileResource.getTsFileSize());
-        // write deletion into modification file
-        tsFileResource.getModFile().write(deletion);
-        // remember to close mod file
-        tsFileResource.getModFile().close();
-      }
-      logger.info(
-          "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
-          deletion.getPath(),
-          deletion.getStartTime(),
-          deletion.getEndTime(),
-          tsFileResource.getModFile().getFilePath());
-
-      // delete data in memory of unsealed file
-      if (!tsFileResource.isClosed()) {
-        TsFileProcessor tsfileProcessor = tsFileResource.getProcessor();
-        tsfileProcessor.deleteDataInMemory(deletion, devicePaths);
+      if (tsFileResource.isClosed()) {
+        // delete data in sealed file
+        if (tsFileResource.isCompacting()) {
+          // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
+          // change after compaction
+          deletion.setFileOffset(Long.MAX_VALUE);
+          // write deletion into compaction modification file
+          tsFileResource.getCompactionModFile().write(deletion);
+          // write deletion into modification file to enable query during compaction
+          tsFileResource.getModFile().write(deletion);
+          // remember to close mod file
+          tsFileResource.getCompactionModFile().close();
+          tsFileResource.getModFile().close();
+        } else {
+          deletion.setFileOffset(tsFileResource.getTsFileSize());
+          // write deletion into modification file
+          tsFileResource.getModFile().write(deletion);
+          // remember to close mod file
+          tsFileResource.getModFile().close();
+        }
+        logger.info(
+            "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
+            deletion.getPath(),
+            deletion.getStartTime(),
+            deletion.getEndTime(),
+            tsFileResource.getModFile().getFilePath());
+      } else {
+        // delete data in memory of unsealed file
+        tsFileResource.getProcessor().deleteDataInMemory(deletion, devicePaths);
       }
 
       // add a record in case of rollback
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 75af3139a8..ad3ed9517b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -925,7 +925,7 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testDeleteDataInFlushingMemtable()
+  public void testDeleteDataNotInFlushingMemtable()
       throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
     for (int j = 0; j < 100; j++) {
       TSRecord record = new TSRecord(j, deviceId);
@@ -936,15 +936,96 @@ public class StorageGroupProcessorTest {
     TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
     tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
 
-    // delete data which is in memtable
+    // delete data which is not in memtable
     processor.delete(new PartialPath("root.vehicle.d2.s0"), 50, 70, 0, null);
 
     // delete data which is not in memtable
     processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
 
+    processor.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+  }
+
+  @Test
+  public void testDeleteDataInSeqFlushingMemtable()
+      throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+    TsFileResource tsFileResource = processor.getTsFileResourceManager().getTsFileList(true).get(0);
+    TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+    tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
+
+    // delete data which is not in flushing memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+    // delete data which is in flushing memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+
+    processor.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertTrue(tsFileResource.getModFile().exists());
+    Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
+  }
+
+  @Test
+  public void testDeleteDataInUnSeqFlushingMemtable()
+      throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+    TsFileResource tsFileResource = processor.getTsFileResourceManager().getTsFileList(true).get(0);
+
+    // delete data which is not in work memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 99, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+    // delete data which is in work memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+
+    processor.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+
+    // insert unseq data points
+    for (int j = 50; j < 100; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(new InsertRowPlan(record));
+    }
+    // delete data which is not in work memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 200, 299, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+    // delete data which is in work memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 80, 85, 0, null);
+
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+
+    tsFileResource = processor.getTsFileResourceManager().getTsFileList(false).get(0);
+    TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+    tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
+
+    // delete data which is not in flushing memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 0, 49, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+    // delete data which is in flushing memtable
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 25, 50, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 50, 80, 0, null);
+    processor.delete(new PartialPath("root.vehicle.d0.s0"), 99, 150, 0, null);
+
     processor.syncCloseAllWorkingTsFileProcessors();
     Assert.assertTrue(tsFileResource.getModFile().exists());
-    Assert.assertEquals(2, tsFileResource.getModFile().getModifications().size());
+    Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
   }
 
   class DummySGP extends VirtualStorageGroupProcessor {