You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/06/04 09:04:49 UTC

[iotdb] branch ShowLockInfo created (now 3932c98)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ShowLockInfo
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 3932c98  format code

This branch includes the following new commits:

     new 3932c98  format code

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: format code

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ShowLockInfo
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3932c9852b8ecb8092ee8fde73d84936121b2ac1
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Fri Jun 4 17:04:00 2021 +0800

    format code
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |  5 ++
 .../engine/storagegroup/StorageGroupProcessor.java | 72 ++++++++++++----------
 .../virtualSg/VirtualStorageGroupManager.java      | 14 ++---
 3 files changed, 52 insertions(+), 39 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 18539c7..9d05e7a 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -76,6 +76,7 @@ statement
     | SHOW VERSION #showVersion
     | SHOW LATEST? TIMESERIES prefixPath? showWhereClause? limitClause? #showTimeseries
     | SHOW STORAGE GROUP prefixPath? #showStorageGroup
+    | SHOW LOCK INFO prefixPath? #showLockInfo
     | SHOW CHILD PATHS prefixPath? #showChildPaths
     | SHOW CHILD NODES prefixPath? #showChildNodes
     | SHOW DEVICES prefixPath? (WITH STORAGE GROUP)? limitClause? #showDevices
@@ -1235,6 +1236,10 @@ ANY
     : A N Y
     ;
 
+LOCK
+    : L O C K
+    ;
+
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 26261df..1e75eea 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -272,6 +272,8 @@ public class StorageGroupProcessor {
   // DEFAULT_POOL_TRIM_INTERVAL_MILLIS
   private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
 
+  private String insertWriteLockHolder = "";
+
   /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
@@ -798,7 +800,7 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
-    writeLock();
+    writeLock("InsertRow");
     try {
       // init map
       long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
@@ -840,7 +842,7 @@ public class StorageGroupProcessor {
   public void insertTablet(InsertTabletPlan insertTabletPlan)
       throws BatchProcessException, TriggerExecutionException {
 
-    writeLock();
+    writeLock("insertTablet");
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
@@ -1122,7 +1124,7 @@ public class StorageGroupProcessor {
   }
 
   public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
-    writeLock();
+    writeLock("submitAFlushTaskWhenShouldFlush");
     try {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor.shouldFlush()) {
@@ -1342,9 +1344,9 @@ public class StorageGroupProcessor {
         "{} will close all files for deleting data folder {}",
         logicalStorageGroupName + "-" + virtualStorageGroupId,
         systemDir);
-    writeLock();
-    syncCloseAllWorkingTsFileProcessors();
+    writeLock("deleteFolder");
     try {
+      syncCloseAllWorkingTsFileProcessors();
       File storageGroupFolder =
           SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId);
       if (storageGroupFolder.exists()) {
@@ -1385,21 +1387,16 @@ public class StorageGroupProcessor {
     logger.info(
         "{} will close all files for deleting data files",
         logicalStorageGroupName + "-" + virtualStorageGroupId);
-    writeLock();
-    syncCloseAllWorkingTsFileProcessors();
-    // normally, mergingModification is just need to be closed by after a merge task is finished.
-    // we close it here just for IT test.
-    if (this.tsFileManagement.mergingModification != null) {
-      try {
+    writeLock("syncDeleteDataFiles");
+    try {
+
+      syncCloseAllWorkingTsFileProcessors();
+      // normally, mergingModification is just need to be closed by after a merge task is finished.
+      // we close it here just for IT test.
+      if (this.tsFileManagement.mergingModification != null) {
         this.tsFileManagement.mergingModification.close();
-      } catch (IOException e) {
-        logger.error(
-            "Cannot close the mergingMod file {}",
-            this.tsFileManagement.mergingModification.getFilePath(),
-            e);
       }
-    }
-    try {
+
       closeAllResources();
       List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
       folder.addAll(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
@@ -1411,6 +1408,11 @@ public class StorageGroupProcessor {
       this.partitionLatestFlushedTimeForEachDevice.clear();
       this.globalLatestFlushedTimeForEachDevice.clear();
       this.latestTimeForEachDevice.clear();
+    } catch (IOException e) {
+      logger.error(
+          "Cannot close the mergingMod file {}",
+          this.tsFileManagement.mergingModification.getFilePath(),
+          e);
     } finally {
       writeUnlock();
     }
@@ -1462,7 +1464,7 @@ public class StorageGroupProcessor {
       return;
     }
 
-    writeLock();
+    writeLock("checkFileTTL");
     try {
       // prevent new merges and queries from choosing this file
       resource.setDeleted(true);
@@ -1522,7 +1524,7 @@ public class StorageGroupProcessor {
   }
 
   public void asyncCloseAllWorkingTsFileProcessors() {
-    writeLock();
+    writeLock("asyncCloseAllWorkingTsFileProcessors");
     try {
       logger.info(
           "async force close all files in storage group: {}",
@@ -1543,7 +1545,7 @@ public class StorageGroupProcessor {
   }
 
   public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
-    writeLock();
+    writeLock("forceCloseAllWorkingTsFileProcessors");
     try {
       logger.info(
           "force close all processors in storage group: {}",
@@ -1612,11 +1614,13 @@ public class StorageGroupProcessor {
     insertLock.readLock().unlock();
   }
 
-  public void writeLock() {
+  public void writeLock(String holder) {
     insertLock.writeLock().lock();
+    insertWriteLockHolder = holder;
   }
 
   public void writeUnlock() {
+    insertWriteLockHolder = "";
     insertLock.writeLock().unlock();
   }
 
@@ -1705,7 +1709,7 @@ public class StorageGroupProcessor {
     // TODO: how to avoid partial deletion?
     // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
     // mod files in mergingModification, sequenceFileList, and unsequenceFileList
-    writeLock();
+    writeLock("delete");
 
     // record files which are updated so that we can roll back them in case of exception
     List<ModificationFile> updatedModFiles = new ArrayList<>();
@@ -2049,7 +2053,7 @@ public class StorageGroupProcessor {
     upgradeFileCount.getAndAdd(-1);
     // load all upgraded resources in this sg to tsFileManagement
     if (upgradeFileCount.get() == 0) {
-      writeLock();
+      writeLock("upgradeTsFileResourceCallBack");
       try {
         loadUpgradedResources(upgradeSeqFileList, true);
         loadUpgradedResources(upgradeUnseqFileList, false);
@@ -2111,7 +2115,7 @@ public class StorageGroupProcessor {
   }
 
   public void merge(boolean isFullMerge) {
-    writeLock();
+    writeLock("merge");
     try {
       for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
         executeCompaction(timePartitionId, isFullMerge);
@@ -2135,7 +2139,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    writeLock();
+    writeLock("loadNewTsFileForSync");
     try {
       if (loadTsFileByType(
           LoadTsFileType.LOAD_SEQUENCE,
@@ -2205,7 +2209,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    writeLock();
+    writeLock("loadNewTsFile");
     try {
       List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
 
@@ -2381,7 +2385,7 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
-    writeLock();
+    writeLock("removeFullyOverlapFiles");
     try {
       Iterator<TsFileResource> iterator = tsFileManagement.getIterator(true);
       removeFullyOverlapFiles(resource, iterator, true);
@@ -2654,7 +2658,7 @@ public class StorageGroupProcessor {
    *     module.
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
-    writeLock();
+    writeLock("deleteTsfile");
     TsFileResource tsFileResourceToBeDeleted = null;
     try {
       Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true);
@@ -2708,7 +2712,7 @@ public class StorageGroupProcessor {
    * @return whether the file to be moved exists. @UsedBy load external tsfile module.
    */
   public boolean moveTsfile(File fileToBeMoved, File targetDir) {
-    writeLock();
+    writeLock("moveTsfile");
     TsFileResource tsFileResourceToBeMoved = null;
     try {
       Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true);
@@ -2839,7 +2843,7 @@ public class StorageGroupProcessor {
   /** remove all partitions that satisfy a filter. */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
-    writeLock();
+    writeLock("removePartitions");
     try {
       // abort ongoing comapctions and merges
       CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
@@ -2896,7 +2900,7 @@ public class StorageGroupProcessor {
 
   public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
       throws WriteProcessException, TriggerExecutionException {
-    writeLock();
+    writeLock("InsertRowsOfOneDevice");
     try {
       boolean isSequence = false;
       InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
@@ -2989,4 +2993,8 @@ public class StorageGroupProcessor {
 
     boolean satisfy(String storageGroupName, long timePartitionId);
   }
+
+  public String getInsertWriteLockHolder() {
+    return insertWriteLockHolder;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index e8d1f77..b7c86b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -201,7 +201,7 @@ public class VirtualStorageGroupManager {
             isSeq);
       }
 
-      processor.writeLock();
+      processor.writeLock("VirtualCloseStorageGroupProcessor-204");
       try {
         if (isSeq) {
           // to avoid concurrent modification problem, we need a new array list
@@ -239,13 +239,13 @@ public class VirtualStorageGroupManager {
             processor.getVirtualStorageGroupId() + "-" + processor.getLogicalStorageGroupName(),
             isSeq,
             partitionId);
-        processor.writeLock();
-        // to avoid concurrent modification problem, we need a new array list
-        List<TsFileProcessor> processors =
-            isSeq
-                ? new ArrayList<>(processor.getWorkSequenceTsFileProcessors())
-                : new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
+        processor.writeLock("VirtualCloseStorageGroupProcessor-242");
         try {
+          // to avoid concurrent modification problem, we need a new array list
+          List<TsFileProcessor> processors =
+              isSeq
+                  ? new ArrayList<>(processor.getWorkSequenceTsFileProcessors())
+                  : new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
           for (TsFileProcessor tsfileProcessor : processors) {
             if (tsfileProcessor.getTimeRangeId() == partitionId) {
               if (isSync) {