You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/04/28 12:39:07 UTC

[incubator-iotdb] branch fix_merge_stuck updated: add lock log

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch fix_merge_stuck
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/fix_merge_stuck by this push:
     new f1b515a  add lock log
f1b515a is described below

commit f1b515a4d3d6fe1a586dacc140d97771fce9e24c
Author: EJTTianyu <16...@qq.com>
AuthorDate: Tue Apr 28 20:37:50 2020 +0800

    add lock log
---
 .../engine/storagegroup/StorageGroupProcessor.java | 99 ++++++++++++----------
 .../iotdb/db/query/control/FileReaderManager.java  | 40 +++++----
 2 files changed, 77 insertions(+), 62 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index ac40adf..120dd58 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -200,22 +200,20 @@ public class StorageGroupProcessor {
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then files of the same
-   * version in different IoTDB instance will have identical data, providing convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files of the same version in
+   * different IoTDB instance will have identical data, providing convenience for data comparison
+   * across different instances. partition number -> max version number
    */
   private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
 
@@ -252,12 +250,14 @@ public class StorageGroupProcessor {
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
       for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
 
@@ -335,13 +335,13 @@ public class StorageGroupProcessor {
           // the process was interrupted before the merged files could be named
           continueFailedRenames(partitionFolder, MERGE_SUFFIX);
 
-        if (!partitionFolder.isDirectory()) {
-          logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
-          continue;
-        }
+          if (!partitionFolder.isDirectory()) {
+            logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
+            continue;
+          }
 
-        Collections.addAll(tsFiles,
-            fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
+          Collections.addAll(tsFiles,
+              fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
         }
       }
 
@@ -463,7 +463,8 @@ public class StorageGroupProcessor {
       long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
 
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
-      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertPlan,
@@ -502,7 +503,8 @@ public class StorageGroupProcessor {
       // before is first start point
       int before = loc;
       // before time partition
-      long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+      long beforeTimePartition = StorageEngine
+          .getTimePartition(insertTabletPlan.getTimes()[before]);
       // init map
       long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
           computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -1189,7 +1191,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+  private void logDeletion(long timestamp, String deviceId, String measurementId,
+      long timePartitionId)
       throws IOException {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1430,8 +1433,10 @@ public class StorageGroupProcessor {
 
   @SuppressWarnings("squid:S1141")
   private void updateMergeModification(TsFileResource seqFile) {
-    logger.warn("start update merge mod");
+    logger.warn("start update merge mod, current File: {}, Read Lock Num: {}",
+        seqFile.getFile().toString(), seqFile.getWriteQueryLock().getReadLockCount());
     seqFile.getWriteQueryLock().writeLock().lock();
+    logger.warn("get the write query lock");
     try {
       // remove old modifications and write modifications generated during merge
       seqFile.removeModFile();
@@ -1451,6 +1456,7 @@ public class StorageGroupProcessor {
           seqFile.getFile(), e);
     } finally {
       seqFile.getWriteQueryLock().writeLock().unlock();
+      logger.warn("rel the write query lock");
       logger.warn("end update merge mod");
     }
   }
@@ -1485,7 +1491,7 @@ public class StorageGroupProcessor {
       TsFileResource seqFile = seqFiles.get(i);
       mergeLock.writeLock().lock();
       try {
-        logger.warn("start merge modification");
+        logger.warn("start merge modification, File: {}", seqFile.getFile().toString());
         updateMergeModification(seqFile);
         if (i == seqFiles.size() - 1) {
           //FIXME if there is an exception, the the modification file will be not closed.
@@ -1520,7 +1526,7 @@ public class StorageGroupProcessor {
     mergeLock.writeLock().lock();
     try {
       if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
-          newFilePartitionId)){
+          newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
@@ -1587,7 +1593,8 @@ public class StorageGroupProcessor {
       long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
-      updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+      updatePartitionFileVersion(partitionNum,
+          Collections.max(newTsFileResource.getHistoricalVersions()));
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1601,12 +1608,12 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
-   * @param newTsFileResource
-   * @param newFilePartitionId
-   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
-   *         POS_OVERLAP(-3) if some file overlaps the new file
-   *         an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+   * them.
+   *
+   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+   * file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
       List<TsFileResource> sequenceList) {
@@ -1647,11 +1654,9 @@ public class StorageGroupProcessor {
 
   /**
    * Compare each device in the two files to find the time relation of them.
-   * @param fileA
-   * @param fileB
-   * @return -1 if fileA is totally older than fileB (A < B)
-   *          0 if fileA is partially older than fileB and partially newer than fileB (A X B)
-   *          1 if fileA is totally newer than fileB (B < A)
+   *
+   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
     boolean hasPre = false, hasSubsequence = false;
@@ -1688,10 +1693,8 @@ public class StorageGroupProcessor {
 
   /**
    * If the historical versions of a file is a sub-set of the given file's, remove it to reduce
-   * unnecessary merge. Only used when the file sender and the receiver share the same file
-   * close policy.
-   * Warning: DO NOT REMOVE
-   * @param resource
+   * unnecessary merge. Only used when the file sender and the receiver share the same file close
+   * policy. Warning: DO NOT REMOVE
    */
   @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
@@ -1752,8 +1755,8 @@ public class StorageGroupProcessor {
    * version number is the version number in the tsfile with a larger timestamp.
    *
    * @param tsfileName origin tsfile name
-   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
-   *                   + 1]
+   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+   * 1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -1819,9 +1822,9 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
-   * @UsedBy sync module, load external tsfile module.
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
+   * @UsedBy sync module, load external tsfile module.
    */
   private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource, long filePartitionId)
@@ -1830,7 +1833,8 @@ public class StorageGroupProcessor {
     switch (type) {
       case LOAD_UNSEQUENCE:
         targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-            storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
+            storageGroupName + File.separatorChar + filePartitionId + File.separator
+                + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
         if (unSequenceFileList.contains(tsFileResource)) {
@@ -1890,7 +1894,8 @@ public class StorageGroupProcessor {
     }
     partitionDirectFileVersions.computeIfAbsent(filePartitionId,
         p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
-    updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+    updatePartitionFileVersion(filePartitionId,
+        Collections.max(tsFileResource.getHistoricalVersions()));
     return true;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index f217a79..ef727ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -37,8 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * FileReaderManager is a singleton, which is used to manage
- * all file readers(opened file streams) to ensure that each file is opened at most once.
+ * FileReaderManager is a singleton, which is used to manage all file readers(opened file streams)
+ * to ensure that each file is opened at most once.
  */
 public class FileReaderManager implements IService {
 
@@ -51,24 +51,24 @@ public class FileReaderManager implements IService {
   private static final int MAX_CACHED_FILE_SIZE = 30000;
 
   /**
-   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
-   * is the corresponding reader.
+   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
+   * corresponding reader.
    */
   private Map<String, TsFileSequenceReader> closedFileReaderMap;
   /**
-   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
-   * is the corresponding reader.
+   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
+   * corresponding reader.
    */
   private Map<String, TsFileSequenceReader> unclosedFileReaderMap;
 
   /**
-   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
-   * is the file's reference count.
+   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
+   * file's reference count.
    */
   private Map<String, AtomicInteger> closedReferenceMap;
   /**
-   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
-   * is the file's reference count.
+   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
+   * file's reference count.
    */
   private Map<String, AtomicInteger> unclosedReferenceMap;
 
@@ -131,7 +131,8 @@ public class FileReaderManager implements IService {
         iterator.remove();
         refMap.remove(entry.getKey());
         if (resourceLogger.isDebugEnabled()) {
-          resourceLogger.debug("{} TsFileReader is closed because of no reference.", entry.getKey());
+          resourceLogger
+              .debug("{} TsFileReader is closed because of no reference.", entry.getKey());
         }
       }
     }
@@ -174,12 +175,17 @@ public class FileReaderManager implements IService {
    * of a reader equals zero, the reader can be closed and removed.
    */
   void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+    logger.warn("Apply read lock");
     tsFile.getWriteQueryLock().readLock().lock();
+    logger.warn("Apply read lock to {}, current read lock num: {}", tsFile.getFile().toString(),
+        tsFile.getWriteQueryLock().getReadLockCount());
     synchronized (this) {
       if (!isClosed) {
-        unclosedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger()).getAndIncrement();
+        unclosedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger())
+            .getAndIncrement();
       } else {
-        closedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger()).getAndIncrement();
+        closedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger())
+            .getAndIncrement();
       }
     }
   }
@@ -192,11 +198,14 @@ public class FileReaderManager implements IService {
     synchronized (this) {
       if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getPath())) {
         unclosedReferenceMap.get(tsFile.getPath()).decrementAndGet();
-      } else if (closedReferenceMap.containsKey(tsFile.getPath())){
+      } else if (closedReferenceMap.containsKey(tsFile.getPath())) {
         closedReferenceMap.get(tsFile.getPath()).decrementAndGet();
       }
     }
+    logger.warn("release read lock");
     tsFile.getWriteQueryLock().readLock().unlock();
+    logger.warn("release read lock: {}, current read lock num: {}", tsFile.getFile().toString(),
+        tsFile.getWriteQueryLock().getReadLockCount());
   }
 
   /**
@@ -204,7 +213,8 @@ public class FileReaderManager implements IService {
    * integration tests will not conflict with each other.
    */
   public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
-    Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet().iterator();
+    Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet()
+        .iterator();
     while (iterator.hasNext()) {
       Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
       entry.getValue().close();