You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/02/24 12:47:52 UTC
[incubator-iotdb] 01/01: add more logs
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FileClosedDebug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7b874d9f1e752de8bb3ae014d85a062dfcc08608
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Feb 24 20:47:23 2020 +0800
add more logs
---
.../iotdb/db/query/control/FileReaderManager.java | 7 +++++--
.../dataset/RawQueryDataSetWithoutValueFilter.java | 19 +++++++++++++++----
.../iotdb/tsfile/read/reader/DefaultTsFileInput.java | 17 ++++++++++++++++-
3 files changed, 36 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index bfda371..5d37fc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -115,7 +116,9 @@ public class FileReaderManager implements IService {
private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
Map<TsFileResource, AtomicInteger> refMap) {
- for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
+ Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
TsFileSequenceReader reader = entry.getValue();
AtomicInteger refAtom = refMap.get(entry.getKey());
@@ -125,7 +128,7 @@ public class FileReaderManager implements IService {
} catch (IOException e) {
logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
}
- readerMap.remove(entry.getKey());
+ iterator.remove();
refMap.remove(entry.getKey());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index c1cb433..c50928e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
private static class ReadTask implements Runnable {
private final ManagedSeriesReader reader;
+ private final String pathName;
private BlockingQueue<BatchData> blockingQueue;
public ReadTask(ManagedSeriesReader reader,
- BlockingQueue<BatchData> blockingQueue) {
+ BlockingQueue<BatchData> blockingQueue, String pathName) {
this.reader = reader;
this.blockingQueue = blockingQueue;
+ this.pathName = pathName;
}
@Override
@@ -91,10 +93,19 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while putting into the blocking queue: ", e);
Thread.currentThread().interrupt();
+ reader.setHasRemaining(false);
} catch (IOException e) {
- LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+ LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
+ reader.setHasRemaining(false);
+ try {
+ blockingQueue.put(SignalBatchData.getInstance());
+ } catch (InterruptedException ex) {
+ LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+ Thread.currentThread().interrupt();
+ }
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
+ reader.setHasRemaining(false);
}
}
}
@@ -153,7 +164,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
ManagedSeriesReader reader = seriesReaderList.get(i);
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
- TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
+ TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
}
for (int i = 0; i < seriesReaderList.size(); i++) {
fillCache(i);
@@ -351,7 +362,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
// now we should submit it again
if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
reader.setManagedByQueryManager(true);
- TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
+ TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
}
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 89b17cc..ca04d2f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -18,20 +18,30 @@
*/
package org.apache.iotdb.tsfile.read.reader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class DefaultTsFileInput implements TsFileInput {
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(DefaultTsFileInput.class);
+
FileChannel channel;
+ private String path;
+
public DefaultTsFileInput(Path file) throws IOException {
channel = FileChannel.open(file, StandardOpenOption.READ);
+ path = file.toString();
}
@Override
@@ -57,7 +67,12 @@ public class DefaultTsFileInput implements TsFileInput {
@Override
public int read(ByteBuffer dst, long position) throws IOException {
- return channel.read(dst, position);
+ try {
+ return channel.read(dst, position);
+ } catch (ClosedChannelException e) {
+ LOGGER.error(String.format("File is closed while reading %s", path));
+ throw e;
+ }
}
@Override