You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/03/12 03:35:02 UTC

[incubator-iotdb] branch TyBugFix created (now 7fa60f1)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch TyBugFix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 7fa60f1  Release query resource while exception happened in query producer thread

This branch includes the following new commits:

     new 7fa60f1  Release query resource while exception happened in query producer thread

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: Release query resource while exception happened in query producer thread

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TyBugFix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7fa60f1a9896c3cee4771369013612dbac0711b5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Mar 12 11:34:38 2020 +0800

    Release query resource while exception happened in query producer thread
---
 .../iotdb/db/engine/cache/TsFileMetaDataCache.java |  9 ++--
 .../engine/storagegroup/StorageGroupProcessor.java | 51 +++++++++-------------
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 46 +++++++++++++------
 .../db/query/executor/RawDataQueryExecutor.java    | 10 +++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 ++++++-
 .../tsfile/read/common/ExceptionBatchData.java     | 16 +++++++
 6 files changed, 96 insertions(+), 51 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
index fd93b32..d5301e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.engine.cache;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +25,9 @@ import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB.
  */
@@ -103,10 +104,10 @@ public class TsFileMetaDataCache {
     }
     synchronized (internPath) {
       synchronized (cache) {
-        if (cache.containsKey(path)) {
+        if (cache.containsKey(tsFileResource)) {
           cacheHitNum.incrementAndGet();
           printCacheLog(true);
-          return cache.get(path);
+          return cache.get(tsFileResource);
         }
       }
       printCacheLog(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 16bfb4a..72e05b1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,28 +18,6 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -62,14 +40,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.*;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
-import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -99,6 +73,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
@@ -1690,9 +1675,15 @@ public class StorageGroupProcessor {
       if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
           && !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
           && seqFile.getWriteQueryLock().writeLock().tryLock()) {
-        iterator.remove();
-        seqFile.remove();
-        seqFile.getWriteQueryLock().writeLock().unlock();
+        try {
+          iterator.remove();
+          seqFile.remove();
+        } catch (Exception e) {
+          logger.error("Something gets wrong while removing FullyOverlapFiles ", e);
+          throw e;
+        } finally {
+          seqFile.getWriteQueryLock().writeLock().unlock();
+        }
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 1d5734a..9273c87 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -95,14 +95,24 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
         Thread.currentThread().interrupt();
         reader.setHasRemaining(false);
       } catch (IOException e) {
-        LOGGER.error(String
-            .format("Something gets wrong while reading from the series reader %s: ", pathName), e);
-        reader.setHasRemaining(false);
+        putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName));
       } catch (Exception e) {
-        LOGGER.error("Something gets wrong: ", e);
+        putExceptionBatchData(e, "Something gets wrong: ");
+      }
+    }
+
+    private void putExceptionBatchData(Exception e, String logMessage) {
+      try {
+        LOGGER.error(logMessage, e);
         reader.setHasRemaining(false);
+        blockingQueue.put(new ExceptionBatchData(e));
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+        LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex);
+        Thread.currentThread().interrupt();
       }
     }
+
   }
 
   private List<ManagedSeriesReader> seriesReaderList;
@@ -141,7 +151,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * @param readers   readers in List(IPointReader) structure
    */
   public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers) throws InterruptedException {
+      List<ManagedSeriesReader> readers) throws IOException, InterruptedException {
     super(paths, dataTypes);
     this.seriesReaderList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
@@ -153,7 +163,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
     init();
   }
 
-  private void init() throws InterruptedException {
+  private void init() throws IOException, InterruptedException {
     timeHeap = new TreeSet<>();
     for (int i = 0; i < seriesReaderList.size(); i++) {
       ManagedSeriesReader reader = seriesReaderList.get(i);
@@ -177,8 +187,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * for RPC in RawData query between client and server fill time buffer, value buffers and bitmap
    * buffers
    */
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
-      throws IOException, InterruptedException {
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
     int seriesNum = seriesReaderList.size();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
 
@@ -339,14 +348,22 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
     return tsQueryDataSet;
   }
 
-  private void fillCache(int seriesIndex) throws InterruptedException {
+  private void fillCache(int seriesIndex) throws IOException, InterruptedException {
     BatchData batchData = blockingQueueArray[seriesIndex].take();
     // no more batch data in this time series queue
     if (batchData instanceof SignalBatchData) {
       noMoreDataInQueueArray[seriesIndex] = true;
-    }
-    // there are more batch data in this time series queue
-    else {
+    } else if (batchData instanceof ExceptionBatchData) {
+      // exception happened in producer thread
+      ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
+      LOGGER.error("exception happened in producer thread", exceptionBatchData.exception);
+      if (exceptionBatchData.exception instanceof IOException) {
+        throw (IOException)exceptionBatchData.exception;
+      } else if (exceptionBatchData.exception instanceof RuntimeException) {
+        throw (RuntimeException)exceptionBatchData.exception;
+      }
+
+    } else {   // there are more batch data in this time series queue
       cachedBatchDataArray[seriesIndex] = batchData;
 
       synchronized (seriesReaderList.get(seriesIndex)) {
@@ -387,7 +404,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * for spark/hadoop/hive integration and test
    */
   @Override
-  protected RowRecord nextWithoutConstraint() {
+  protected RowRecord nextWithoutConstraint() throws IOException {
     int seriesNum = seriesReaderList.size();
 
     long minTime = timeHeap.pollFirst();
@@ -414,6 +431,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
           } catch (InterruptedException e) {
             LOGGER.error("Interrupted while taking from the blocking queue: ", e);
             Thread.currentThread().interrupt();
+          } catch (IOException e) {
+            LOGGER.error("Got IOException", e);
+            throw e;
           }
         }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 4f9ea11..4eb2cbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.query.executor;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -41,6 +39,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * IoTDB query executor.
  */
@@ -60,7 +62,7 @@ public class RawDataQueryExecutor {
    * without filter or with global time filter.
    */
   public QueryDataSet executeWithoutValueFilter(QueryContext context)
-      throws StorageEngineException {
+          throws StorageEngineException {
 
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
     try {
@@ -69,6 +71,8 @@ public class RawDataQueryExecutor {
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
+    } catch (IOException e) {
+      throw new StorageEngineException(e.getMessage());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3ad965c..130dd6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -592,6 +592,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private TSExecuteStatementResp internalExecuteQueryStatement(
       long statementId, PhysicalPlan plan, int fetchSize, String username) {
     long t1 = System.currentTimeMillis();
+    long queryId = -1;
     try {
       TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
 
@@ -611,7 +612,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
       // generate the queryId for the operation
-      long queryId = generateQueryId(true);
+      queryId = generateQueryId(true);
       // put it into the corresponding Set
 
       statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
@@ -629,6 +630,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return resp;
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      if (queryId != -1) {
+        try {
+          releaseQueryResource(queryId);
+        } catch (StorageEngineException ex) {
+          logger.error("Error happened while releasing query resource: ", ex);
+        }
+      }
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
@@ -861,6 +869,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      try {
+        releaseQueryResource(req.queryId);
+      } catch (StorageEngineException ex) {
+        logger.error("Error happened while releasing query resource: ", ex);
+      }
       return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
new file mode 100644
index 0000000..0d7eb57
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
@@ -0,0 +1,16 @@
+package org.apache.iotdb.tsfile.read.common;
+
+
+public class ExceptionBatchData extends BatchData {
+
+  public Exception exception;
+
+  public ExceptionBatchData(Exception exception) {
+    this.exception = exception;
+  }
+
+  @Override
+  public boolean hasCurrent() {
+    throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
+  }
+}