You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/06/04 07:20:31 UTC

[iotdb] 01/03: init

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DeadLockTryFix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ea299280671168370c50c427d4e3541c959a577
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Jun 3 20:35:20 2021 +0800

    init
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   5 +
 .../engine/storagegroup/StorageGroupProcessor.java | 138 +++++++++++----------
 .../virtualSg/VirtualStorageGroupManager.java      |  14 +--
 3 files changed, 84 insertions(+), 73 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 54d9aab..2557190 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -76,6 +76,7 @@ statement
     | SHOW VERSION #showVersion
     | SHOW LATEST? TIMESERIES prefixPath? showWhereClause? limitClause? #showTimeseries
     | SHOW STORAGE GROUP prefixPath? #showStorageGroup
+    | SHOW LOCK INFO prefixPath? #showLockInfo
     | SHOW CHILD PATHS prefixPath? #showChildPaths
     | SHOW CHILD NODES prefixPath? #showChildNodes
     | SHOW DEVICES prefixPath? (WITH STORAGE GROUP)? limitClause? #showDevices
@@ -1310,6 +1311,10 @@ ANY
     : A N Y
     ;
 
+LOCK
+    : L O C K
+    ;
+
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7427f98..8ae58b2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,38 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
  * TsFileProcessor in the working status. <br>
@@ -268,6 +266,8 @@ public class StorageGroupProcessor {
   // DEFAULT_POOL_TRIM_INTERVAL_MILLIS
   private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
 
+  private String insertWriteLockHolder = "";
+
   /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
@@ -793,7 +793,7 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
     }
-    writeLock();
+    writeLock("InsertRow");
     try {
       // init map
       long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
@@ -829,7 +829,7 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchProcessException {
-    writeLock();
+    writeLock("insertTablet");
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
@@ -1087,7 +1087,7 @@ public class StorageGroupProcessor {
   }
 
   public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
-    writeLock();
+    writeLock("submitAFlushTaskWhenShouldFlush");
     try {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor.shouldFlush()) {
@@ -1307,9 +1307,9 @@ public class StorageGroupProcessor {
         "{} will close all files for deleting data folder {}",
         logicalStorageGroupName + "-" + virtualStorageGroupId,
         systemDir);
-    writeLock();
-    syncCloseAllWorkingTsFileProcessors();
+    writeLock("deleteFolder");
     try {
+      syncCloseAllWorkingTsFileProcessors();
       File storageGroupFolder =
           SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId);
       if (storageGroupFolder.exists()) {
@@ -1350,21 +1350,16 @@ public class StorageGroupProcessor {
     logger.info(
         "{} will close all files for deleting data files",
         logicalStorageGroupName + "-" + virtualStorageGroupId);
-    writeLock();
-    syncCloseAllWorkingTsFileProcessors();
-    // normally, mergingModification is just need to be closed by after a merge task is finished.
-    // we close it here just for IT test.
-    if (this.tsFileManagement.mergingModification != null) {
-      try {
+    writeLock("syncDeleteDataFiles");
+    try {
+
+      syncCloseAllWorkingTsFileProcessors();
+      // normally, mergingModification is just need to be closed by after a merge task is finished.
+      // we close it here just for IT test.
+      if (this.tsFileManagement.mergingModification != null) {
         this.tsFileManagement.mergingModification.close();
-      } catch (IOException e) {
-        logger.error(
-            "Cannot close the mergingMod file {}",
-            this.tsFileManagement.mergingModification.getFilePath(),
-            e);
       }
-    }
-    try {
+
       closeAllResources();
       List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
       folder.addAll(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
@@ -1376,6 +1371,11 @@ public class StorageGroupProcessor {
       this.partitionLatestFlushedTimeForEachDevice.clear();
       this.globalLatestFlushedTimeForEachDevice.clear();
       this.latestTimeForEachDevice.clear();
+    } catch (IOException e) {
+      logger.error(
+          "Cannot close the mergingMod file {}",
+          this.tsFileManagement.mergingModification.getFilePath(),
+          e);
     } finally {
       writeUnlock();
     }
@@ -1427,7 +1427,7 @@ public class StorageGroupProcessor {
       return;
     }
 
-    writeLock();
+    writeLock("checkFileTTL");
     try {
       // prevent new merges and queries from choosing this file
       resource.setDeleted(true);
@@ -1487,7 +1487,7 @@ public class StorageGroupProcessor {
   }
 
   public void asyncCloseAllWorkingTsFileProcessors() {
-    writeLock();
+    writeLock("asyncCloseAllWorkingTsFileProcessors");
     try {
       logger.info(
           "async force close all files in storage group: {}",
@@ -1508,7 +1508,7 @@ public class StorageGroupProcessor {
   }
 
   public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
-    writeLock();
+    writeLock("forceCloseAllWorkingTsFileProcessors");
     try {
       logger.info(
           "force close all processors in storage group: {}",
@@ -1580,11 +1580,13 @@ public class StorageGroupProcessor {
     insertLock.readLock().unlock();
   }
 
-  public void writeLock() {
+  public void writeLock(String holder) {
     insertLock.writeLock().lock();
+    insertWriteLockHolder = holder;
   }
 
   public void writeUnlock() {
+    insertWriteLockHolder = "";
     insertLock.writeLock().unlock();
   }
 
@@ -1681,7 +1683,7 @@ public class StorageGroupProcessor {
     // TODO: how to avoid partial deletion?
     // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
     // mod files in mergingModification, sequenceFileList, and unsequenceFileList
-    writeLock();
+    writeLock("delete");
 
     // record files which are updated so that we can roll back them in case of exception
     List<ModificationFile> updatedModFiles = new ArrayList<>();
@@ -2025,7 +2027,7 @@ public class StorageGroupProcessor {
     upgradeFileCount.getAndAdd(-1);
     // load all upgraded resources in this sg to tsFileManagement
     if (upgradeFileCount.get() == 0) {
-      writeLock();
+      writeLock("upgradeTsFileResourceCallBack");
       try {
         loadUpgradedResources(upgradeSeqFileList, true);
         loadUpgradedResources(upgradeUnseqFileList, false);
@@ -2087,7 +2089,7 @@ public class StorageGroupProcessor {
   }
 
   public void merge(boolean isFullMerge) {
-    writeLock();
+    writeLock("merge");
     try {
       for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
         executeCompaction(timePartitionId, isFullMerge);
@@ -2111,7 +2113,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    writeLock();
+    writeLock("loadNewTsFileForSync");
     try {
       if (loadTsFileByType(
           LoadTsFileType.LOAD_SEQUENCE,
@@ -2181,7 +2183,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    writeLock();
+    writeLock("loadNewTsFile");
     try {
       List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
 
@@ -2357,7 +2359,7 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
-    writeLock();
+    writeLock("removeFullyOverlapFiles");
     try {
       Iterator<TsFileResource> iterator = tsFileManagement.getIterator(true);
       removeFullyOverlapFiles(resource, iterator, true);
@@ -2630,7 +2632,7 @@ public class StorageGroupProcessor {
    *     module.
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
-    writeLock();
+    writeLock("deleteTsfile");
     TsFileResource tsFileResourceToBeDeleted = null;
     try {
       Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true);
@@ -2684,7 +2686,7 @@ public class StorageGroupProcessor {
    * @return whether the file to be moved exists. @UsedBy load external tsfile module.
    */
   public boolean moveTsfile(File fileToBeMoved, File targetDir) {
-    writeLock();
+    writeLock("moveTsfile");
     TsFileResource tsFileResourceToBeMoved = null;
     try {
       Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true);
@@ -2815,7 +2817,7 @@ public class StorageGroupProcessor {
   /** remove all partitions that satisfy a filter. */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
-    writeLock();
+    writeLock("removePartitions");
     try {
       // abort ongoing comapctions and merges
       CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
@@ -2872,7 +2874,7 @@ public class StorageGroupProcessor {
 
   public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
       throws WriteProcessException {
-    writeLock();
+    writeLock("InsertRowsOfOneDevice");
     try {
       boolean isSequence = false;
       InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
@@ -2960,4 +2962,8 @@ public class StorageGroupProcessor {
 
     boolean satisfy(String storageGroupName, long timePartitionId);
   }
+
+  public String getInsertWriteLockHolder() {
+    return insertWriteLockHolder;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index e8d1f77..b7c86b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -201,7 +201,7 @@ public class VirtualStorageGroupManager {
             isSeq);
       }
 
-      processor.writeLock();
+      processor.writeLock("VirtualCloseStorageGroupProcessor-204");
       try {
         if (isSeq) {
           // to avoid concurrent modification problem, we need a new array list
@@ -239,13 +239,13 @@ public class VirtualStorageGroupManager {
             processor.getVirtualStorageGroupId() + "-" + processor.getLogicalStorageGroupName(),
             isSeq,
             partitionId);
-        processor.writeLock();
-        // to avoid concurrent modification problem, we need a new array list
-        List<TsFileProcessor> processors =
-            isSeq
-                ? new ArrayList<>(processor.getWorkSequenceTsFileProcessors())
-                : new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
+        processor.writeLock("VirtualCloseStorageGroupProcessor-242");
         try {
+          // to avoid concurrent modification problem, we need a new array list
+          List<TsFileProcessor> processors =
+              isSeq
+                  ? new ArrayList<>(processor.getWorkSequenceTsFileProcessors())
+                  : new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
           for (TsFileProcessor tsfileProcessor : processors) {
             if (tsfileProcessor.getTimeRangeId() == partitionId) {
               if (isSync) {