You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/27 11:13:34 UTC
[incubator-iotdb] branch master updated: [IOTDB-512] file reader
close bug (#846)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5337696 [IOTDB-512] file reader close bug (#846)
5337696 is described below
commit 53376960c0fe00fb3d71bf29a3a3b80252879d5e
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Feb 27 19:13:22 2020 +0800
[IOTDB-512] file reader close bug (#846)
* fix TsFileSequenceReader being closed bug
---
.../apache/iotdb/db/engine/cache/ChunkCache.java | 37 ++++++++++++++--------
.../iotdb/db/query/control/FileReaderManager.java | 34 +++++++++++++-------
.../iotdb/db/query/control/QueryFileManager.java | 17 +++++-----
.../dataset/RawQueryDataSetWithoutValueFilter.java | 13 +++++---
.../iotdb/db/query/reader/series/SeriesReader.java | 8 ++---
.../iotdb/tsfile/read/TsFileSequenceReader.java | 27 +++++++---------
.../tsfile/read/reader/DefaultTsFileInput.java | 2 +-
7 files changed, 79 insertions(+), 59 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 24a5ad1..90e6105 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,21 +85,31 @@ public class ChunkCache {
lock.readLock().unlock();
}
- lock.writeLock().lock();
- if (lruCache.containsKey(chunkMetaData)) {
- lock.readLock().lock();
- lock.writeLock().unlock();
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- Chunk chunk = lruCache.get(chunkMetaData);
- lock.readLock().unlock();
+ Lock cacheLock = lock.writeLock();
+ try {
+ cacheLock.lock();
+ if (lruCache.containsKey(chunkMetaData)) {
+ try {
+ cacheLock = lock.readLock();
+ cacheLock.lock();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ cacheHitNum.incrementAndGet();
+ printCacheLog(true);
+ Chunk chunk = lruCache.get(chunkMetaData);
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
+ }
+ printCacheLog(false);
+ Chunk chunk = reader.readMemChunk(chunkMetaData);
+ lruCache.put(chunkMetaData, chunk);
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
+ } catch (IOException e) {
+ logger.error("something wrong happened while reading {}", reader.getFileName());
+ throw e;
+ } finally {
+ cacheLock.unlock();
}
- printCacheLog(false);
- Chunk chunk = reader.readMemChunk(chunkMetaData);
- lruCache.put(chunkMetaData, chunk);
- lock.writeLock().unlock();
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index bfda371..e2536f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -115,7 +116,9 @@ public class FileReaderManager implements IService {
private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
Map<TsFileResource, AtomicInteger> refMap) {
- for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
+ Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
TsFileSequenceReader reader = entry.getValue();
AtomicInteger refAtom = refMap.get(entry.getKey());
@@ -125,8 +128,11 @@ public class FileReaderManager implements IService {
} catch (IOException e) {
logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
}
- readerMap.remove(entry.getKey());
+ iterator.remove();
refMap.remove(entry.getKey());
+ if (resourceLogger.isDebugEnabled()) {
+ resourceLogger.debug("{} TsFileReader is closed because of no reference.", entry.getValue().getFileName());
+ }
}
}
}
@@ -186,9 +192,9 @@ public class FileReaderManager implements IService {
void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
synchronized (this) {
if (!isClosed && unclosedReferenceMap.containsKey(tsFile)) {
- unclosedReferenceMap.get(tsFile).getAndDecrement();
+ unclosedReferenceMap.get(tsFile).decrementAndGet();
} else if (closedReferenceMap.containsKey(tsFile)){
- closedReferenceMap.get(tsFile).getAndDecrement();
+ closedReferenceMap.get(tsFile).decrementAndGet();
}
}
tsFile.getWriteQueryLock().readLock().unlock();
@@ -199,21 +205,25 @@ public class FileReaderManager implements IService {
* integration tests will not conflict with each other.
*/
public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
- for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : closedFileReaderMap.entrySet()) {
+ Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
entry.getValue().close();
- if (resourceLogger.isInfoEnabled()) {
- resourceLogger.info("{} closedTsFileReader is closed.", entry.getValue().getFileName());
+ if (resourceLogger.isDebugEnabled()) {
+ resourceLogger.debug("{} closedTsFileReader is closed.", entry.getValue().getFileName());
}
closedReferenceMap.remove(entry.getKey());
- closedFileReaderMap.remove(entry.getKey());
+ iterator.remove();
}
- for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : unclosedFileReaderMap.entrySet()) {
+ iterator = unclosedFileReaderMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
entry.getValue().close();
- if (resourceLogger.isInfoEnabled()) {
- resourceLogger.info("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
+ if (resourceLogger.isDebugEnabled()) {
+ resourceLogger.debug("{} unclosedTsFileReader is closed.", entry.getValue().getFileName());
}
unclosedReferenceMap.remove(entry.getKey());
- unclosedFileReaderMap.remove(entry.getKey());
+ iterator.remove();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index ad2cfd2..8224aae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -18,14 +18,11 @@
*/
package org.apache.iotdb.db.query.control;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
@@ -77,8 +74,10 @@ public class QueryFileManager {
// this file may be deleted just before we lock it
if (tsFileResource.isDeleted()) {
Map<Long, Set<TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
- pathMap.get(queryId).remove(tsFileResource);
- FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+ // This resource may be removed by other threads of this query.
+ if (pathMap.get(queryId).remove(tsFileResource)) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+ }
iterator.remove();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index c1cb433..233b008 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
private static class ReadTask implements Runnable {
private final ManagedSeriesReader reader;
+ private final String pathName;
private BlockingQueue<BatchData> blockingQueue;
public ReadTask(ManagedSeriesReader reader,
- BlockingQueue<BatchData> blockingQueue) {
+ BlockingQueue<BatchData> blockingQueue, String pathName) {
this.reader = reader;
this.blockingQueue = blockingQueue;
+ this.pathName = pathName;
}
@Override
@@ -91,10 +93,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while putting into the blocking queue: ", e);
Thread.currentThread().interrupt();
+ reader.setHasRemaining(false);
} catch (IOException e) {
- LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+ LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
+ reader.setHasRemaining(false);
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
+ reader.setHasRemaining(false);
}
}
}
@@ -153,7 +158,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
ManagedSeriesReader reader = seriesReaderList.get(i);
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
- TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
+ TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
}
for (int i = 0; i < seriesReaderList.size(); i++) {
fillCache(i);
@@ -351,7 +356,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
// now we should submit it again
if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
reader.setManagedByQueryManager(true);
- TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
+ TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index f9da1c6..8b1f795 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -390,11 +390,9 @@ public class SeriesReader {
}
for (ChunkMetaData data : currentChunkMetaDataList) {
- if (data.getChunkLoader() == null) {
- TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
- .get(resource, resource.isClosed());
- data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
- }
+ TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
+ .get(resource, resource.isClosed());
+ data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
}
List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk();
if (memChunks != null) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index b8cd41d..6fd2e43 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -18,16 +18,6 @@
*/
package org.apache.iotdb.tsfile.read;
-import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -37,11 +27,7 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.*;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -54,6 +40,17 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
+
public class TsFileSequenceReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 89b17cc..d20a313 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -28,7 +28,7 @@ import java.nio.file.StandardOpenOption;
public class DefaultTsFileInput implements TsFileInput {
- FileChannel channel;
+ private FileChannel channel;
public DefaultTsFileInput(Path file) throws IOException {
channel = FileChannel.open(file, StandardOpenOption.READ);