You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 02:59:13 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix dead lock bug

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 8189a87  fix dead lock bug
     new 9bf0540  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
8189a87 is described below

commit 8189a8731f2cf9fd7dcbdc2b99765425038972d9
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 10:57:23 2019 +0800

    fix dead lock bug
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 155 ++++++++++-----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  22 ++-
 .../iotdb/db/engine/memtable/MemTablePool.java     |   2 +-
 3 files changed, 91 insertions(+), 88 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 1cb1dc6..d7f6f6b 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -29,12 +29,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
@@ -66,9 +64,6 @@ public class FileNodeProcessorV2 {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessorV2.class);
 
-  private static final MManager mManager = MManager.getInstance();
-  private static final DirectoryManager directoryManager = DirectoryManager.getInstance();
-
   private FileSchema fileSchema;
 
   // includes sealed and unsealed sequnce tsfiles
@@ -95,7 +90,7 @@ public class FileNodeProcessorV2 {
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  private Condition closeFileNodeCondition;
+  private Object closeFileNodeCondition = new Object();
 
   /**
    * Mark whether to close file node
@@ -113,7 +108,6 @@ public class FileNodeProcessorV2 {
 
   public FileNodeProcessorV2(String baseDir, String storageGroupName) throws ProcessorException {
     this.storageGroupName = storageGroupName;
-    closeFileNodeCondition = lock.writeLock().newCondition();
 
     // construct the file schema
     this.fileSchema = constructFileSchema(storageGroupName);
@@ -124,7 +118,8 @@ public class FileNodeProcessorV2 {
     try {
       File storageGroupInfoDir = new File(baseDir, storageGroupName);
       if (storageGroupInfoDir.mkdirs()) {
-        LOGGER.info("Storage Group Info Directory {} doesn't exist, create it", storageGroupInfoDir.getPath());
+        LOGGER.info("Storage Group Info Directory {} doesn't exist, create it",
+            storageGroupInfoDir.getPath());
       }
 
       versionController = new SimpleFileVersionController(
@@ -139,32 +134,32 @@ public class FileNodeProcessorV2 {
   private void recover() throws ProcessorException {
     LOGGER.info("recover FileNodeProcessor {}", storageGroupName);
     List<File> tsFiles = new ArrayList<>();
-    List<String> seqFileFolders = directoryManager.getAllTsFileFolders();
-    for (String baseDir: seqFileFolders) {
+    List<String> seqFileFolders = DirectoryManager.getInstance().getAllTsFileFolders();
+    for (String baseDir : seqFileFolders) {
       File fileFolder = new File(baseDir, storageGroupName);
       if (!fileFolder.exists()) {
         continue;
       }
-      for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
+      for (File tsfile : fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX))) {
         tsFiles.add(tsfile);
       }
     }
     recoverSeqFiles(tsFiles);
 
     tsFiles.clear();
-    List<String> unseqFileFolder = directoryManager.getAllOverflowFileFolders();
-    for (String baseDir: unseqFileFolder) {
+    List<String> unseqFileFolder = DirectoryManager.getInstance().getAllOverflowFileFolders();
+    for (String baseDir : unseqFileFolder) {
       File fileFolder = new File(baseDir, storageGroupName);
       if (!fileFolder.exists()) {
         continue;
       }
-      for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
+      for (File tsfile : fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX))) {
         tsFiles.add(tsfile);
       }
     }
     recoverUnseqFiles(tsFiles);
 
-    for (TsFileResourceV2 resource: sequenceFileList) {
+    for (TsFileResourceV2 resource : sequenceFileList) {
       latestTimeForEachDevice.putAll(resource.getEndTimeMap());
       latestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
     }
@@ -172,7 +167,7 @@ public class FileNodeProcessorV2 {
 
   private void recoverSeqFiles(List<File> tsfiles) throws ProcessorException {
     tsfiles.sort(new CompareFileName());
-    for (File tsfile: tsfiles) {
+    for (File tsfile : tsfiles) {
       TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
       sequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
@@ -183,10 +178,11 @@ public class FileNodeProcessorV2 {
 
   private void recoverUnseqFiles(List<File> tsfiles) throws ProcessorException {
     tsfiles.sort(new CompareFileName());
-    for (File tsfile: tsfiles) {
+    for (File tsfile : tsfiles) {
       TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
       unSequenceFileList.add(tsFileResource);
-      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", fileSchema,
+      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
+          fileSchema,
           versionController, tsFileResource, true);
       recoverPerformer.recover();
     }
@@ -199,7 +195,7 @@ public class FileNodeProcessorV2 {
       String[] items1 = o1.getName().split("-");
       String[] items2 = o2.getName().split("-");
       if (Long.valueOf(items1[0]) - Long.valueOf(items2[0]) == 0) {
-        return  Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1]));
+        return Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1]));
       } else {
         return Long.compare(Long.valueOf(items1[0]), Long.valueOf(items2[0]));
       }
@@ -208,7 +204,7 @@ public class FileNodeProcessorV2 {
 
   private FileSchema constructFileSchema(String storageGroupName) {
     List<MeasurementSchema> columnSchemaList;
-    columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
+    columnSchemaList = MManager.getInstance().getSchemaForFileName(storageGroupName);
 
     FileSchema schema = new FileSchema();
     for (MeasurementSchema measurementSchema : columnSchemaList) {
@@ -237,17 +233,15 @@ public class FileNodeProcessorV2 {
    * @return -1: failed, 1: Overflow, 2:Bufferwrite
    */
   public boolean insert(InsertPlan insertPlan) {
-    lock.writeLock().lock();
-
     try {
-      if(toBeClosed){
-        throw new FileNodeProcessorException("storage group " + storageGroupName + " is to be closed, this insertion is rejected");
+      if (toBeClosed) {
+        throw new FileNodeProcessorException(
+            "storage group " + storageGroupName + " is to be closed, this insertion is rejected");
       }
       // init map
       latestTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
       latestFlushedTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
 
-      boolean result;
       // insert to sequence or unSequence file
       if (insertPlan.getTime() > latestFlushedTimeForEachDevice.get(insertPlan.getDeviceId())) {
         return insertUnsealedDataFile(insertPlan, true);
@@ -257,12 +251,11 @@ public class FileNodeProcessorV2 {
     } catch (FileNodeProcessorException | IOException e) {
       LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
       return false;
-    } finally {
-      lock.writeLock().unlock();
     }
   }
 
-  private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence) throws IOException {
+  private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence)
+      throws IOException {
     lock.writeLock().lock();
     UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
     try {
@@ -296,7 +289,7 @@ public class FileNodeProcessorV2 {
       }
 
       return result;
-    }finally {
+    } finally {
       lock.writeLock().unlock();
     }
   }
@@ -304,9 +297,9 @@ public class FileNodeProcessorV2 {
   private UnsealedTsFileProcessorV2 createTsFileProcessor(boolean sequence) throws IOException {
     String baseDir;
     if (sequence) {
-      baseDir = directoryManager.getNextFolderForSequenceFile();
+      baseDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
     } else {
-      baseDir = directoryManager.getNextFolderForUnSequenceFile();
+      baseDir = DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
     }
     new File(baseDir, storageGroupName).mkdirs();
 
@@ -321,7 +314,7 @@ public class FileNodeProcessorV2 {
     } else {
       return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
           fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback,
-          ()->true);
+          () -> true);
     }
   }
 
@@ -361,7 +354,7 @@ public class FileNodeProcessorV2 {
           if (tsFileResource.isClosed()) {
             tsfileResourcesForQuery.add(tsFileResource);
           } else {
-            Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = null;
+            Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair;
             try {
               pair = tsFileResource
                   .getUnsealedFileProcessor()
@@ -435,7 +428,8 @@ public class FileNodeProcessorV2 {
   }
 
 
-  private void deleteFiles(List<TsFileResourceV2> tsFileResourceList, Deletion deletion, List<ModificationFile> updatedModFiles)
+  private void deleteFiles(List<TsFileResourceV2> tsFileResourceList, Deletion deletion,
+      List<ModificationFile> updatedModFiles)
       throws IOException {
     String deviceId = deletion.getDevice();
     for (TsFileResourceV2 tsFileResource : tsFileResourceList) {
@@ -459,8 +453,8 @@ public class FileNodeProcessorV2 {
   }
 
   /**
-   * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
-   * will be executed by a flush thread.
+   * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
+   * setCloseMark task will be executed by a flush thread.
    *
    * only called by insert(), thread-safety should be ensured by caller
    */
@@ -513,6 +507,7 @@ public class FileNodeProcessorV2 {
 
   /**
    * when close an UnsealedTsFileProcessor, update its EndTimeMap immediately
+   *
    * @param tsFileProcessor processor to be closed
    */
   private void updateEndTimeMap(UnsealedTsFileProcessorV2 tsFileProcessor) {
@@ -526,21 +521,22 @@ public class FileNodeProcessorV2 {
   /**
    * This method will be blocked until all tsfile processors are closed.
    */
-  public void syncCloseFileNode(){
-    lock.writeLock().lock();
-    try {
-      asyncForceClose();
-      while (true) {
-        if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()) {
-          break;
+  public void syncCloseFileNode() {
+    synchronized (closeFileNodeCondition) {
+      try {
+        asyncForceClose();
+        while (true) {
+          if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor
+              .isEmpty()) {
+            break;
+          }
+          closeFileNodeCondition.wait();
         }
-        closeFileNodeCondition.await();
+      } catch (InterruptedException e) {
+        LOGGER
+            .error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
+                storageGroupName, e);
       }
-    } catch (InterruptedException e) {
-      LOGGER.error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
-          storageGroupName, e);
-    } finally {
-      lock.writeLock().unlock();
     }
   }
 
@@ -548,23 +544,24 @@ public class FileNodeProcessorV2 {
   /**
    * This method will be blocked until this file node can be closed.
    */
-  public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
-    lock.writeLock().lock();
-    try {
-      asyncForceClose();
-      toBeClosed = true;
-      while (true) {
-        if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()) {
-          removeProcessorFromManagerCallback.get();
-          break;
+  public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback) {
+    synchronized (closeFileNodeCondition) {
+      try {
+        asyncForceClose();
+        toBeClosed = true;
+        while (true) {
+          if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor
+              .isEmpty()) {
+            removeProcessorFromManagerCallback.get();
+            break;
+          }
+          closeFileNodeCondition.wait();
         }
-        closeFileNodeCondition.await();
+      } catch (InterruptedException e) {
+        LOGGER
+            .error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
+                storageGroupName, e);
       }
-    } catch (InterruptedException e) {
-      LOGGER.error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
-          storageGroupName, e);
-    } finally {
-      lock.writeLock().unlock();
     }
   }
 
@@ -586,21 +583,19 @@ public class FileNodeProcessorV2 {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and insert method.
-  public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
-    lock.writeLock().lock();
-    try {
-      // end time with one start time
-      TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
-      resource.setClosed(true);
-      if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
-        closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
-      } else {
-        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
-      }
-      LOGGER.info("signal closing file node condition");
-      closeFileNodeCondition.signal();
-    }finally {
-      lock.writeLock().unlock();
+  public void closeUnsealedTsFileProcessorCallback(
+      UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+    // end time with one start time
+    TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
+    resource.setClosed(true);
+    if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
+      closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+    } else {
+      closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+    }
+    LOGGER.info("signal closing file node condition");
+    synchronized (closeFileNodeCondition) {
+      closeFileNodeCondition.notify();
     }
   }
 
@@ -617,7 +612,7 @@ public class FileNodeProcessorV2 {
     return storageGroupName;
   }
 
-  public int getClosingProcessorSize(){
+  public int getClosingProcessorSize() {
     return unSequenceFileList.size() + sequenceFileList.size();
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 6d41d07..f16e0ab 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -100,7 +100,8 @@ public class UnsealedTsFileProcessorV2 {
     this.tsFileResource = new TsFileResourceV2(tsfile, this);
     this.versionController = versionController;
     this.writer = new NativeRestorableIOWriter(tsfile);
-    this.logNode = MultiFileLogNodeManager.getInstance().getNode(storageGroupName + "-" + tsfile.getName());
+    this.logNode = MultiFileLogNodeManager.getInstance()
+        .getNode(storageGroupName + "-" + tsfile.getName());
     this.closeUnsealedFileCallback = closeUnsealedFileCallback;
     this.flushUpdateLatestFlushTimeCallback = flushUpdateLatestFlushTimeCallback;
     LOGGER.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
@@ -153,7 +154,7 @@ public class UnsealedTsFileProcessorV2 {
         workMemTable
             .delete(deletion.getDevice(), deletion.getMeasurement(), deletion.getTimestamp());
       }
-      for (IMemTable memTable: flushingMemTables) {
+      for (IMemTable memTable : flushingMemTables) {
         memTable.delete(deletion);
       }
     } finally {
@@ -183,7 +184,8 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   public void syncClose() {
-    LOGGER.info("Synch close file: {}, first async close it", tsFileResource.getFile().getAbsolutePath());
+    LOGGER.info("Synch close file: {}, first async close it",
+        tsFileResource.getFile().getAbsolutePath());
     asyncClose();
     synchronized (flushingMemTables) {
       try {
@@ -274,7 +276,9 @@ public class UnsealedTsFileProcessorV2 {
     try {
       writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
-      LOGGER.info("flush finished, remove a memtable from flushing list, flushing memtable list size: {}", flushingMemTables.size());
+      LOGGER.info(
+          "flush finished, remove a memtable from flushing list, flushing memtable list size: {}",
+          flushingMemTables.size());
     } finally {
       flushQueryLock.writeLock().unlock();
     }
@@ -299,7 +303,8 @@ public class UnsealedTsFileProcessorV2 {
       logNode.notifyEndFlush();
       LOGGER.info("flush a memtable has finished");
     } else {
-      LOGGER.info("release an empty memtable from flushing memtable list, which is submitted in force flush");
+      LOGGER.info(
+          "release an empty memtable from flushing memtable list, which is submitted in force flush");
       releaseFlushedMemTableCallback(memTableToFlush);
     }
 
@@ -316,6 +321,7 @@ public class UnsealedTsFileProcessorV2 {
         flushingMemTables.notify();
       }
     }
+
   }
 
   private void endFile() throws IOException {
@@ -406,8 +412,10 @@ public class UnsealedTsFileProcessorV2 {
 
       ModificationFile modificationFile = tsFileResource.getModFile();
 
-      List<ChunkMetaData> chunkMetaDataList = writer.getVisibleMetadatas(deviceId, measurementId, dataType);
-      QueryUtils.modifyChunkMetaData(chunkMetaDataList, (List<Modification>) modificationFile.getModifications());
+      List<ChunkMetaData> chunkMetaDataList = writer
+          .getVisibleMetadatas(deviceId, measurementId, dataType);
+      QueryUtils.modifyChunkMetaData(chunkMetaDataList,
+          (List<Modification>) modificationFile.getModifications());
 
       return new Pair<>(timeValuePairSorter, chunkMetaDataList);
     } catch (IOException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 51936e0..4902ca6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -72,7 +72,7 @@ public class MemTablePool {
         } catch (InterruptedException e) {
           LOGGER.error("{} fails to wait fot memtables {}, continue to wait", applier, e);
         }
-        LOGGER.info("{} has waited for a memtable for {}ms", applier, waitCount * WAIT_TIME);
+        LOGGER.info("{} has waited for a memtable for {}ms", applier, waitCount++ * WAIT_TIME);
       }
     }
   }