You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/02/24 12:47:52 UTC

[incubator-iotdb] 01/01: add more logs

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FileClosedDebug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7b874d9f1e752de8bb3ae014d85a062dfcc08608
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Feb 24 20:47:23 2020 +0800

    add more logs
---
 .../iotdb/db/query/control/FileReaderManager.java     |  7 +++++--
 .../dataset/RawQueryDataSetWithoutValueFilter.java    | 19 +++++++++++++++----
 .../iotdb/tsfile/read/reader/DefaultTsFileInput.java  | 17 ++++++++++++++++-
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index bfda371..5d37fc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -115,7 +116,9 @@ public class FileReaderManager implements IService {
 
   private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
       Map<TsFileResource, AtomicInteger> refMap) {
-    for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
+    Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
       TsFileSequenceReader reader = entry.getValue();
       AtomicInteger refAtom = refMap.get(entry.getKey());
 
@@ -125,7 +128,7 @@ public class FileReaderManager implements IService {
         } catch (IOException e) {
           logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
         }
-        readerMap.remove(entry.getKey());
+        iterator.remove();
         refMap.remove(entry.getKey());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index c1cb433..c50928e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
   private static class ReadTask implements Runnable {
 
     private final ManagedSeriesReader reader;
+    private final String pathName;
     private BlockingQueue<BatchData> blockingQueue;
 
     public ReadTask(ManagedSeriesReader reader,
-        BlockingQueue<BatchData> blockingQueue) {
+        BlockingQueue<BatchData> blockingQueue, String pathName) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
+      this.pathName = pathName;
     }
 
     @Override
@@ -91,10 +93,19 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       } catch (InterruptedException e) {
         LOGGER.error("Interrupted while putting into the blocking queue: ", e);
         Thread.currentThread().interrupt();
+        reader.setHasRemaining(false);
       } catch (IOException e) {
-        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+        LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
+        reader.setHasRemaining(false);
+        try {
+          blockingQueue.put(SignalBatchData.getInstance());
+        } catch (InterruptedException ex) {
+          LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+          Thread.currentThread().interrupt();
+        }
       } catch (Exception e) {
         LOGGER.error("Something gets wrong: ", e);
+        reader.setHasRemaining(false);
       }
     }
   }
@@ -153,7 +164,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       ManagedSeriesReader reader = seriesReaderList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
-      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
+      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       fillCache(i);
@@ -351,7 +362,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
           // now we should submit it again
           if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
             reader.setManagedByQueryManager(true);
-            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
+            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
           }
         }
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 89b17cc..ca04d2f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -18,20 +18,30 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 
 public class DefaultTsFileInput implements TsFileInput {
 
+  private static final Logger LOGGER = LoggerFactory
+          .getLogger(DefaultTsFileInput.class);
+
   FileChannel channel;
 
+  private String path;
+
   public DefaultTsFileInput(Path file) throws IOException {
     channel = FileChannel.open(file, StandardOpenOption.READ);
+    path = file.toString();
   }
 
   @Override
@@ -57,7 +67,12 @@ public class DefaultTsFileInput implements TsFileInput {
 
   @Override
   public int read(ByteBuffer dst, long position) throws IOException {
-    return channel.read(dst, position);
+    try {
+      return channel.read(dst, position);
+    } catch (ClosedChannelException e) {
+      LOGGER.error(String.format("File is closed while reading %s", path));
+      throw e;
+    }
   }
 
   @Override