You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/27 11:13:34 UTC

[incubator-iotdb] branch master updated: [IOTDB-512] file reader close bug (#846)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5337696  [IOTDB-512] file reader close bug (#846)
5337696 is described below

commit 53376960c0fe00fb3d71bf29a3a3b80252879d5e
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Feb 27 19:13:22 2020 +0800

    [IOTDB-512] file reader close bug (#846)
    
    * fix TsFileSequenceReader being closed bug
---
 .../apache/iotdb/db/engine/cache/ChunkCache.java   | 37 ++++++++++++++--------
 .../iotdb/db/query/control/FileReaderManager.java  | 34 +++++++++++++-------
 .../iotdb/db/query/control/QueryFileManager.java   | 17 +++++-----
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 13 +++++---
 .../iotdb/db/query/reader/series/SeriesReader.java |  8 ++---
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 27 +++++++---------
 .../tsfile/read/reader/DefaultTsFileInput.java     |  2 +-
 7 files changed, 79 insertions(+), 59 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 24a5ad1..90e6105 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -84,21 +85,31 @@ public class ChunkCache {
       lock.readLock().unlock();
     }
 
-    lock.writeLock().lock();
-    if (lruCache.containsKey(chunkMetaData)) {
-      lock.readLock().lock();
-      lock.writeLock().unlock();
-      cacheHitNum.incrementAndGet();
-      printCacheLog(true);
-      Chunk chunk = lruCache.get(chunkMetaData);
-      lock.readLock().unlock();
+    Lock cacheLock = lock.writeLock();
+    try {
+      cacheLock.lock();
+      if (lruCache.containsKey(chunkMetaData)) {
+        try {
+          cacheLock = lock.readLock();
+          cacheLock.lock();
+        } finally {
+          lock.writeLock().unlock();
+        }
+        cacheHitNum.incrementAndGet();
+        printCacheLog(true);
+        Chunk chunk = lruCache.get(chunkMetaData);
+        return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
+      }
+      printCacheLog(false);
+      Chunk chunk = reader.readMemChunk(chunkMetaData);
+      lruCache.put(chunkMetaData, chunk);
       return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
+    } catch (IOException e) {
+      logger.error("something wrong happened while reading {}", reader.getFileName());
+      throw e;
+    } finally {
+      cacheLock.unlock();
     }
-    printCacheLog(false);
-    Chunk chunk = reader.readMemChunk(chunkMetaData);
-    lruCache.put(chunkMetaData, chunk);
-    lock.writeLock().unlock();
-    return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
 
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index bfda371..e2536f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -115,7 +116,9 @@ public class FileReaderManager implements IService {
 
   private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
       Map<TsFileResource, AtomicInteger> refMap) {
-    for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
+    Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
       TsFileSequenceReader reader = entry.getValue();
       AtomicInteger refAtom = refMap.get(entry.getKey());
 
@@ -125,8 +128,11 @@ public class FileReaderManager implements IService {
         } catch (IOException e) {
           logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
         }
-        readerMap.remove(entry.getKey());
+        iterator.remove();
         refMap.remove(entry.getKey());
+        if (resourceLogger.isDebugEnabled()) {
+          resourceLogger.debug("{} TsFileReader is closed because of no reference.", entry.getValue().getFileName());
+        }
       }
     }
   }
@@ -186,9 +192,9 @@ public class FileReaderManager implements IService {
   void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
     synchronized (this) {
       if (!isClosed && unclosedReferenceMap.containsKey(tsFile)) {
-        unclosedReferenceMap.get(tsFile).getAndDecrement();
+        unclosedReferenceMap.get(tsFile).decrementAndGet();
       } else if (closedReferenceMap.containsKey(tsFile)){
-        closedReferenceMap.get(tsFile).getAndDecrement();
+        closedReferenceMap.get(tsFile).decrementAndGet();
       }
     }
     tsFile.getWriteQueryLock().readLock().unlock();
@@ -199,21 +205,25 @@ public class FileReaderManager implements IService {
    * integration tests will not conflict with each other.
    */
   public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
-    for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : closedFileReaderMap.entrySet()) {
+    Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
       entry.getValue().close();
-      if (resourceLogger.isInfoEnabled()) {
-        resourceLogger.info("{} closedTsFileReader is closed.", entry.getValue().getFileName());
+      if (resourceLogger.isDebugEnabled()) {
+        resourceLogger.debug("{} closedTsFileReader is closed.", entry.getValue().getFileName());
       }
       closedReferenceMap.remove(entry.getKey());
-      closedFileReaderMap.remove(entry.getKey());
+      iterator.remove();
     }
-    for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : unclosedFileReaderMap.entrySet()) {
+    iterator = unclosedFileReaderMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
       entry.getValue().close();
-      if (resourceLogger.isInfoEnabled()) {
-        resourceLogger.info("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
+      if (resourceLogger.isDebugEnabled()) {
+        resourceLogger.debug("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
       }
       unclosedReferenceMap.remove(entry.getKey());
-      unclosedFileReaderMap.remove(entry.getKey());
+      iterator.remove();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index ad2cfd2..8224aae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -18,14 +18,11 @@
  */
 package org.apache.iotdb.db.query.control;
 
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * <p>
@@ -77,8 +74,10 @@ public class QueryFileManager {
       // this file may be deleted just before we lock it
       if (tsFileResource.isDeleted()) {
         Map<Long, Set<TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
-        pathMap.get(queryId).remove(tsFileResource);
-        FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+        // This resource may be removed by other threads of this query.
+        if (pathMap.get(queryId).remove(tsFileResource)) {
+          FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+        }
         iterator.remove();
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index c1cb433..233b008 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
   private static class ReadTask implements Runnable {
 
     private final ManagedSeriesReader reader;
+    private final String pathName;
     private BlockingQueue<BatchData> blockingQueue;
 
     public ReadTask(ManagedSeriesReader reader,
-        BlockingQueue<BatchData> blockingQueue) {
+        BlockingQueue<BatchData> blockingQueue, String pathName) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
+      this.pathName = pathName;
     }
 
     @Override
@@ -91,10 +93,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       } catch (InterruptedException e) {
         LOGGER.error("Interrupted while putting into the blocking queue: ", e);
         Thread.currentThread().interrupt();
+        reader.setHasRemaining(false);
       } catch (IOException e) {
-        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+        LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
+        reader.setHasRemaining(false);
       } catch (Exception e) {
         LOGGER.error("Something gets wrong: ", e);
+        reader.setHasRemaining(false);
       }
     }
   }
@@ -153,7 +158,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       ManagedSeriesReader reader = seriesReaderList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
-      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
+      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       fillCache(i);
@@ -351,7 +356,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
           // now we should submit it again
           if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
             reader.setManagedByQueryManager(true);
-            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
+            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index f9da1c6..8b1f795 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -390,11 +390,9 @@ public class SeriesReader {
     }
 
     for (ChunkMetaData data : currentChunkMetaDataList) {
-      if (data.getChunkLoader() == null) {
-        TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
-            .get(resource, resource.isClosed());
-        data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
-      }
+      TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
+          .get(resource, resource.isClosed());
+      data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
     }
     List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
     if (memChunks != null) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index b8cd41d..6fd2e43 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -18,16 +18,6 @@
  */
 package org.apache.iotdb.tsfile.read;
 
-import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -37,11 +27,7 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -54,6 +40,17 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
+
 public class TsFileSequenceReader implements AutoCloseable {
 
   private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 89b17cc..d20a313 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -28,7 +28,7 @@ import java.nio.file.StandardOpenOption;
 
 public class DefaultTsFileInput implements TsFileInput {
 
-  FileChannel channel;
+  private FileChannel channel;
 
   public DefaultTsFileInput(Path file) throws IOException {
     channel = FileChannel.open(file, StandardOpenOption.READ);