You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/26 14:50:19 UTC

[iotdb] branch to0.12closeReader created (now 4698e7d)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch to0.12closeReader
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 4698e7d  close TsFileSequenceReader immediately if ref is zero

This branch includes the following new commits:

     new 4698e7d  close TsFileSequenceReader immediately if ref is zero

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: close TsFileSequenceReader immediately if ref is zero

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12closeReader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4698e7d135750b4391fc09943c3483c6b2a19e8b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Nov 26 22:49:35 2021 +0800

    close TsFileSequenceReader immediately if ref is zero
---
 .../iotdb/db/query/control/FileReaderManager.java  | 112 ++++++---------------
 .../iotdb/db/engine/cache/ChunkCacheTest.java      |   5 +-
 .../compaction/LevelCompactionRecoverTest.java     |   4 +-
 .../compaction/LevelCompactionRestoreTest.java     |   1 -
 .../db/engine/compaction/LevelCompactionTest.java  |   1 -
 .../apache/iotdb/db/engine/merge/MergeTest.java    |   1 -
 .../db/query/control/FileReaderManagerTest.java    |  19 +---
 .../query/reader/series/SeriesReaderTestUtil.java  |   1 -
 8 files changed, 42 insertions(+), 102 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index cf329af..0046bf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -18,11 +18,7 @@
  */
 package org.apache.iotdb.db.query.control;
 
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.UnClosedTsFileReader;
@@ -35,15 +31,13 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FileReaderManager is a singleton, which is used to manage all file readers(opened file streams)
  * to ensure that each file is opened at most once.
  */
-public class FileReaderManager implements IService {
+public class FileReaderManager {
 
   private static final Logger logger = LoggerFactory.getLogger(FileReaderManager.class);
   private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
@@ -73,16 +67,11 @@ public class FileReaderManager implements IService {
    */
   private Map<String, AtomicInteger> unclosedReferenceMap;
 
-  private ScheduledExecutorService executorService;
-
   private FileReaderManager() {
     closedFileReaderMap = new ConcurrentHashMap<>();
     unclosedFileReaderMap = new ConcurrentHashMap<>();
     closedReferenceMap = new ConcurrentHashMap<>();
     unclosedReferenceMap = new ConcurrentHashMap<>();
-    executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "open-files-manager");
-
-    clearUnUsedFilesInFixTime();
   }
 
   public static FileReaderManager getInstance() {
@@ -102,46 +91,6 @@ public class FileReaderManager implements IService {
     }
   }
 
-  private void clearUnUsedFilesInFixTime() {
-
-    long examinePeriod = IoTDBDescriptor.getInstance().getConfig().getCacheFileReaderClearPeriod();
-
-    executorService.scheduleAtFixedRate(
-        () -> {
-          synchronized (this) {
-            clearMap(closedFileReaderMap, closedReferenceMap);
-            clearMap(unclosedFileReaderMap, unclosedReferenceMap);
-          }
-        },
-        0,
-        examinePeriod,
-        TimeUnit.MILLISECONDS);
-  }
-
-  private void clearMap(
-      Map<String, TsFileSequenceReader> readerMap, Map<String, AtomicInteger> refMap) {
-    Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
-      TsFileSequenceReader reader = entry.getValue();
-      AtomicInteger refAtom = refMap.get(entry.getKey());
-
-      if (refAtom != null && refAtom.get() == 0) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
-        }
-        iterator.remove();
-        refMap.remove(entry.getKey());
-        if (resourceLogger.isDebugEnabled()) {
-          resourceLogger.debug(
-              "{} TsFileReader is closed because of no reference.", entry.getKey());
-        }
-      }
-    }
-  }
-
   /**
    * Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
    * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
@@ -213,14 +162,44 @@ public class FileReaderManager implements IService {
   void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
     synchronized (this) {
       if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        if (unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) {
+          closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), false);
+        }
       } else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        if (closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) {
+          closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), true);
+        }
       }
     }
     tsFile.readUnlock();
   }
 
+  private void closeUnUsedReaderAndRemoveRef(String tsFilePath, boolean isClosed) {
+    Map<String, TsFileSequenceReader> readerMap =
+        isClosed ? closedFileReaderMap : unclosedFileReaderMap;
+    Map<String, AtomicInteger> refMap = isClosed ? closedReferenceMap : unclosedReferenceMap;
+    synchronized (this) {
+      // check ref num again
+      if (refMap.get(tsFilePath).get() != 0) {
+        return;
+      }
+
+      TsFileSequenceReader reader = readerMap.get(tsFilePath);
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
+        }
+      }
+      readerMap.remove(tsFilePath);
+      refMap.remove(tsFilePath);
+      if (resourceLogger.isDebugEnabled()) {
+        resourceLogger.debug("{} TsFileReader is closed because of no reference.", tsFilePath);
+      }
+    }
+  }
+
   /**
    * Only for <code>EnvironmentUtils.cleanEnv</code> method. To make sure that unit tests and
    * integration tests will not conflict with each other.
@@ -255,31 +234,6 @@ public class FileReaderManager implements IService {
         || (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFilePath()));
   }
 
-  @Override
-  public void start() {
-    // Do nothing
-  }
-
-  @Override
-  public void stop() {
-    if (executorService == null || executorService.isShutdown()) {
-      return;
-    }
-
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      logger.error("StatMonitor timing service could not be shutdown.", e);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.FILE_READER_MANAGER_SERVICE;
-  }
-
   private static class FileReaderManagerHelper {
 
     private static final FileReaderManager INSTANCE = new FileReaderManager();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
index 1a0d25d..e374f14 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
@@ -50,7 +50,9 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
 import static org.junit.Assert.assertTrue;
@@ -251,6 +253,5 @@ public class ChunkCacheTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index d26f1c2..0d2d64f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -74,7 +74,8 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class LevelCompactionRecoverTest {
 
@@ -188,7 +189,6 @@ public class LevelCompactionRecoverTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRestoreTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRestoreTest.java
index f884448..8c42dd5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRestoreTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRestoreTest.java
@@ -183,7 +183,6 @@ public class LevelCompactionRestoreTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index d614144..3a8f468 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -206,7 +206,6 @@ abstract class LevelCompactionTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 699e3a8..18970d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -199,7 +199,6 @@ abstract class MergeTest {
     }
 
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index ea5b75b..663b6d8 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -123,25 +123,14 @@ public class FileReaderManagerTest {
     t1.join();
     t2.join();
 
+    Thread.sleep(1000);
+    // Since we have closed the reader after reading the file, it should be false that the file is
+    // still contained by manager
     for (int i = 1; i <= MAX_FILE_SIZE; i++) {
       TsFileResource tsFile = new TsFileResource(SystemFileFactory.INSTANCE.getFile(filePath + i));
-      Assert.assertTrue(manager.contains(tsFile, false));
+      Assert.assertFalse(manager.contains(tsFile, false));
     }
 
-    // the code below is not valid because the cacheFileReaderClearPeriod config in this class is
-    // not valid
-
-    // TimeUnit.SECONDS.sleep(5);
-    //
-    // for (int i = 1; i <= MAX_FILE_SIZE; i++) {
-    //
-    // if (i == 4 || i == 5 || i == 6) {
-    // Assert.assertTrue(manager.contains(filePath + i));
-    // } else {
-    // Assert.assertFalse(manager.contains(filePath + i));
-    // }
-    // }
-
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
     for (int i = 1; i < MAX_FILE_SIZE; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(filePath + i);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index f865ee3..142e309 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -235,6 +235,5 @@ public class SeriesReaderTestUtil {
     }
 
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 }