You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/02/28 09:37:22 UTC

[incubator-iotdb] branch fix_30 updated: fix by comments

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_30
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/fix_30 by this push:
     new a33640f  fix by comments
a33640f is described below

commit a33640fc94f128f744ebbd83f65f039629ffd31d
Author: 江天 <jt...@163.com>
AuthorDate: Thu Feb 28 17:36:24 2019 +0800

    fix by comments
---
 .../iotdb/db/query/control/FileReaderManager.java  | 41 +++++++++++++---------
 .../db/query/control/OpenedFilePathsManager.java   | 19 +++++-----
 .../db/query/factory/SeriesReaderFactory.java      |  4 +--
 .../query/reader/sequence/SealedTsFilesReader.java |  2 +-
 .../reader/sequence/UnSealedTsFileReader.java      |  2 +-
 .../db/query/control/FileReaderManagerTest.java    | 14 ++++----
 6 files changed, 46 insertions(+), 36 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 5a22718..95c21ca 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -47,17 +47,25 @@ public class FileReaderManager implements IService {
   private static final int MAX_CACHED_FILE_SIZE = 30000;
 
   /**
-   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the corresponding
-   * reader.
+   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
+   * is the corresponding reader.
    */
   private ConcurrentHashMap<String, TsFileSequenceReader> closedFileReaderMap;
+  /**
+   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
+   * is the corresponding reader.
+   */
   private ConcurrentHashMap<String, TsFileSequenceReader> unclosedFileReaderMap;
 
   /**
-   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the file's
-   * reference count.
+   * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
+   * is the file's reference count.
    */
   private ConcurrentHashMap<String, AtomicInteger> closedReferenceMap;
+  /**
+   * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
+   * is the file's reference count.
+   */
   private ConcurrentHashMap<String, AtomicInteger> unclosedReferenceMap;
 
   private ScheduledExecutorService executorService;
@@ -109,17 +117,18 @@ public class FileReaderManager implements IService {
 
   /**
    * Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
-   * exists, just get it from closedFileReaderMap. Otherwise a new reader will be created.
+   * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosed .
+   * Otherwise a new reader will be created and cached.
    *
    * @param filePath the path of the file, of which the reader is desired.
-   * @param isUnClosed whether the corresponding file still receives insertions or not.
+   * @param isClosed whether the corresponding file still receives insertions or not.
    * @return the reader of the file specified by filePath.
    * @throws IOException when reader cannot be created.
    */
-  public synchronized TsFileSequenceReader get(String filePath, boolean isUnClosed)
+  public synchronized TsFileSequenceReader get(String filePath, boolean isClosed)
       throws IOException {
 
-    Map<String, TsFileSequenceReader> readerMap = isUnClosed ? unclosedFileReaderMap
+    Map<String, TsFileSequenceReader> readerMap = !isClosed ? unclosedFileReaderMap
         : closedFileReaderMap;
     if (!readerMap.containsKey(filePath)) {
 
@@ -127,7 +136,7 @@ public class FileReaderManager implements IService {
         LOGGER.warn("Query has opened {} files !", readerMap.size());
       }
 
-      TsFileSequenceReader tsFileReader = isUnClosed ? new UnClosedTsFileReader(filePath)
+      TsFileSequenceReader tsFileReader = !isClosed ? new UnClosedTsFileReader(filePath)
           : new TsFileSequenceReader(filePath);
 
       readerMap.put(filePath, tsFileReader);
@@ -141,8 +150,8 @@ public class FileReaderManager implements IService {
    * Increase the reference count of the reader specified by filePath. Only when the reference count
    * of a reader equals zero, the reader can be closed and removed.
    */
-  public synchronized void increaseFileReaderReference(String filePath, boolean isUnClosed) {
-    if (isUnClosed) {
+  public synchronized void increaseFileReaderReference(String filePath, boolean isClosed) {
+    if (!isClosed) {
       unclosedReferenceMap.computeIfAbsent(filePath, k -> new AtomicInteger()).getAndIncrement();
     } else {
       closedReferenceMap.computeIfAbsent(filePath, k -> new AtomicInteger()).getAndIncrement();
@@ -153,8 +162,8 @@ public class FileReaderManager implements IService {
    * Decrease the reference count of the reader specified by filePath. This method is latch-free.
    * Only when the reference count of a reader equals zero, the reader can be closed and removed.
    */
-  public synchronized void decreaseFileReaderReference(String filePath, boolean isUnclosed) {
-    if (isUnclosed) {
+  public synchronized void decreaseFileReaderReference(String filePath, boolean isClosed) {
+    if (!isClosed) {
       unclosedReferenceMap.get(filePath).getAndDecrement();
     } else {
       closedReferenceMap.get(filePath).getAndDecrement();
@@ -198,9 +207,9 @@ public class FileReaderManager implements IService {
   /**
    * This method is only for unit tests.
    */
-  public synchronized boolean contains(String filePath, boolean isUnclosed) {
-    return (!isUnclosed && closedFileReaderMap.containsKey(filePath))
-        || (isUnclosed && unclosedFileReaderMap.containsKey(filePath));
+  public synchronized boolean contains(String filePath, boolean isClosed) {
+    return (isClosed && closedFileReaderMap.containsKey(filePath))
+        || (!isClosed && unclosedFileReaderMap.containsKey(filePath));
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 6bbb691..b2c1452 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -63,24 +63,24 @@ public class OpenedFilePathsManager {
   }
 
   /**
-   * Add the unique file paths to closedFilePathsMap.
+   * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap.
    */
   public void addUsedFilesForCurrentRequestThread(long jobId, QueryDataSource dataSource) {
     for (IntervalFileNode intervalFileNode : dataSource.getSeqDataSource().getSealedTsFiles()) {
       String sealedFilePath = intervalFileNode.getFilePath();
-      addFilePathToMap(jobId, sealedFilePath, false);
+      addFilePathToMap(jobId, sealedFilePath, true);
     }
 
     if (dataSource.getSeqDataSource().hasUnsealedTsFile()) {
       String unSealedFilePath = dataSource.getSeqDataSource().getUnsealedTsFile().getFilePath();
-      addFilePathToMap(jobId, unSealedFilePath, true);
+      addFilePathToMap(jobId, unSealedFilePath, false);
     }
 
     for (OverflowInsertFile overflowInsertFile : dataSource.getOverflowSeriesDataSource()
         .getOverflowInsertFileList()) {
       String overflowFilePath = overflowInsertFile.getFilePath();
       // overflow is unclosed by default
-      addFilePathToMap(jobId, overflowFilePath, true);
+      addFilePathToMap(jobId, overflowFilePath, false);
     }
   }
 
@@ -106,15 +106,16 @@ public class OpenedFilePathsManager {
 
   /**
    * Increase the usage reference of filePath of job id. Before the invoking of this method,
-   * <code>this.setJobIdForCurrentRequestThread</code> has been invoked, so <code>closedFilePathsMap.get(jobId)</code> must
-   * not return null.
+   * <code>this.setJobIdForCurrentRequestThread</code> has been invoked,
+   * so <code>closedFilePathsMap.get(jobId)</code> or <code>unclosedFilePathsMap.get(jobId)</code>
+   * must not return null.
    */
-  public void addFilePathToMap(long jobId, String filePath, boolean isUnclosed) {
-    ConcurrentHashMap<Long, Set<String>> pathMap = isUnclosed ? unclosedFilePathsMap :
+  public void addFilePathToMap(long jobId, String filePath, boolean isClosed) {
+    ConcurrentHashMap<Long, Set<String>> pathMap = !isClosed ? unclosedFilePathsMap :
         closedFilePathsMap;
     if (!pathMap.get(jobId).contains(filePath)) {
       pathMap.get(jobId).add(filePath);
-      FileReaderManager.getInstance().increaseFileReaderReference(filePath, isUnclosed);
+      FileReaderManager.getInstance().increaseFileReaderReference(filePath, isClosed);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 4854479..51ce94b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -81,7 +81,7 @@ public class SeriesReaderFactory {
 
       // store only one opened file stream into manager, to avoid too many opened files
       TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
-          .get(overflowInsertFile.getFilePath(), true);
+          .get(overflowInsertFile.getFilePath(), false);
 
       ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
 
@@ -161,7 +161,7 @@ public class SeriesReaderFactory {
       QueryContext context)
       throws IOException {
     TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
-        .get(fileNode.getFilePath(), false);
+        .get(fileNode.getFilePath(), true);
     ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
     MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(tsFileSequenceReader);
     List<ChunkMetaData> metaDataList = metadataQuerier
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 4db8c9d..a8465bf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -171,7 +171,7 @@ public class SealedTsFilesReader implements IReader {
 
     // to avoid too many opened files
     TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
-        .get(fileNode.getFilePath(), false);
+        .get(fileNode.getFilePath(), true);
 
     MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
     List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
index 0dda8c5..3c02930 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java
@@ -50,7 +50,7 @@ public class UnSealedTsFileReader implements IReader {
 
     TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
         .get(unsealedTsFile.getFilePath(),
-            true);
+            false);
     ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
 
     if (filter == null) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 99c925e..cff2664 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -66,9 +66,9 @@ public class FileReaderManagerTest {
 
         for (int i = 1; i <= 6; i++) {
           OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + i,
-              true);
-          manager.get(filePath + i, true);
-          Assert.assertTrue(manager.contains(filePath + i, true));
+              false);
+          manager.get(filePath + i, false);
+          Assert.assertTrue(manager.contains(filePath + i, false));
         }
 
       } catch (IOException e) {
@@ -84,9 +84,9 @@ public class FileReaderManagerTest {
 
         for (int i = 4; i <= MAX_FILE_SIZE; i++) {
           OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + i,
-              true);
-          manager.get(filePath + i, true);
-          Assert.assertTrue(manager.contains(filePath + i, true));
+              false);
+          manager.get(filePath + i, false);
+          Assert.assertTrue(manager.contains(filePath + i, false));
         }
 
       } catch (IOException e) {
@@ -100,7 +100,7 @@ public class FileReaderManagerTest {
     t2.join();
 
     for (int i = 1; i <= MAX_FILE_SIZE; i++) {
-      Assert.assertTrue(manager.contains(filePath + i, true));
+      Assert.assertTrue(manager.contains(filePath + i, false));
     }
 
     for (int i = 1; i <= MAX_FILE_SIZE; i++) {