You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/02/24 12:47:51 UTC

[incubator-iotdb] branch FileClosedDebug created (now 7b874d9)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch FileClosedDebug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 7b874d9  add more logs

This branch includes the following new commits:

     new 7b874d9  add more logs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: add more logs

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FileClosedDebug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7b874d9f1e752de8bb3ae014d85a062dfcc08608
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Feb 24 20:47:23 2020 +0800

    add more logs
---
 .../iotdb/db/query/control/FileReaderManager.java     |  7 +++++--
 .../dataset/RawQueryDataSetWithoutValueFilter.java    | 19 +++++++++++++++----
 .../iotdb/tsfile/read/reader/DefaultTsFileInput.java  | 17 ++++++++++++++++-
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index bfda371..5d37fc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -115,7 +116,9 @@ public class FileReaderManager implements IService {
 
   private void clearMap(Map<TsFileResource, TsFileSequenceReader> readerMap,
       Map<TsFileResource, AtomicInteger> refMap) {
-    for (Map.Entry<TsFileResource, TsFileSequenceReader> entry : readerMap.entrySet()) {
+    Iterator<Map.Entry<TsFileResource, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<TsFileResource, TsFileSequenceReader> entry = iterator.next();
       TsFileSequenceReader reader = entry.getValue();
       AtomicInteger refAtom = refMap.get(entry.getKey());
 
@@ -125,7 +128,7 @@ public class FileReaderManager implements IService {
         } catch (IOException e) {
           logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
         }
-        readerMap.remove(entry.getKey());
+        iterator.remove();
         refMap.remove(entry.getKey());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index c1cb433..c50928e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -46,12 +46,14 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
   private static class ReadTask implements Runnable {
 
     private final ManagedSeriesReader reader;
+    private final String pathName;
     private BlockingQueue<BatchData> blockingQueue;
 
     public ReadTask(ManagedSeriesReader reader,
-        BlockingQueue<BatchData> blockingQueue) {
+        BlockingQueue<BatchData> blockingQueue, String pathName) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
+      this.pathName = pathName;
     }
 
     @Override
@@ -91,10 +93,19 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       } catch (InterruptedException e) {
         LOGGER.error("Interrupted while putting into the blocking queue: ", e);
         Thread.currentThread().interrupt();
+        reader.setHasRemaining(false);
       } catch (IOException e) {
-        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+        LOGGER.error(String.format("Something gets wrong while reading from the series reader %s: ", pathName), e);
+        reader.setHasRemaining(false);
+        try {
+          blockingQueue.put(SignalBatchData.getInstance());
+        } catch (InterruptedException ex) {
+          LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+          Thread.currentThread().interrupt();
+        }
       } catch (Exception e) {
         LOGGER.error("Something gets wrong: ", e);
+        reader.setHasRemaining(false);
       }
     }
   }
@@ -153,7 +164,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
       ManagedSeriesReader reader = seriesReaderList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
-      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i]));
+      TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       fillCache(i);
@@ -351,7 +362,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
           // now we should submit it again
           if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
             reader.setManagedByQueryManager(true);
-            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex]));
+            TASK_POOL_MANAGER.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath()));
           }
         }
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 89b17cc..ca04d2f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -18,20 +18,30 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 
 public class DefaultTsFileInput implements TsFileInput {
 
+  private static final Logger LOGGER = LoggerFactory
+          .getLogger(DefaultTsFileInput.class);
+
   FileChannel channel;
 
+  private String path;
+
   public DefaultTsFileInput(Path file) throws IOException {
     channel = FileChannel.open(file, StandardOpenOption.READ);
+    path = file.toString();
   }
 
   @Override
@@ -57,7 +67,12 @@ public class DefaultTsFileInput implements TsFileInput {
 
   @Override
   public int read(ByteBuffer dst, long position) throws IOException {
-    return channel.read(dst, position);
+    try {
+      return channel.read(dst, position);
+    } catch (ClosedChannelException e) {
+      LOGGER.error(String.format("File is closed while reading %s", path));
+      throw e;
+    }
   }
 
   @Override