You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/03/12 03:35:03 UTC

[incubator-iotdb] 01/01: Release query resource while exception happened in query producer thread

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TyBugFix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7fa60f1a9896c3cee4771369013612dbac0711b5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Mar 12 11:34:38 2020 +0800

    Release query resource while exception happened in query producer thread
---
 .../iotdb/db/engine/cache/TsFileMetaDataCache.java |  9 ++--
 .../engine/storagegroup/StorageGroupProcessor.java | 51 +++++++++-------------
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 46 +++++++++++++------
 .../db/query/executor/RawDataQueryExecutor.java    | 10 +++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 ++++++-
 .../tsfile/read/common/ExceptionBatchData.java     | 16 +++++++
 6 files changed, 96 insertions(+), 51 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
index fd93b32..d5301e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.engine.cache;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +25,9 @@ import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB.
  */
@@ -103,10 +104,10 @@ public class TsFileMetaDataCache {
     }
     synchronized (internPath) {
       synchronized (cache) {
-        if (cache.containsKey(path)) {
+        if (cache.containsKey(tsFileResource)) {
           cacheHitNum.incrementAndGet();
           printCacheLog(true);
-          return cache.get(path);
+          return cache.get(tsFileResource);
         }
       }
       printCacheLog(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 16bfb4a..72e05b1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,28 +18,6 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -62,14 +40,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.*;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
-import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -99,6 +73,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
@@ -1690,9 +1675,15 @@ public class StorageGroupProcessor {
       if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
           && !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
           && seqFile.getWriteQueryLock().writeLock().tryLock()) {
-        iterator.remove();
-        seqFile.remove();
-        seqFile.getWriteQueryLock().writeLock().unlock();
+        try {
+          iterator.remove();
+          seqFile.remove();
+        } catch (Exception e) {
+          logger.error("Something gets wrong while removing FullyOverlapFiles ", e);
+          throw e;
+        } finally {
+          seqFile.getWriteQueryLock().writeLock().unlock();
+        }
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 1d5734a..9273c87 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -95,14 +95,24 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
         Thread.currentThread().interrupt();
         reader.setHasRemaining(false);
       } catch (IOException e) {
-        LOGGER.error(String
-            .format("Something gets wrong while reading from the series reader %s: ", pathName), e);
-        reader.setHasRemaining(false);
+        putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName));
       } catch (Exception e) {
-        LOGGER.error("Something gets wrong: ", e);
+        putExceptionBatchData(e, "Something gets wrong: ");
+      }
+    }
+
+    private void putExceptionBatchData(Exception e, String logMessage) {
+      try {
+        LOGGER.error(logMessage, e);
         reader.setHasRemaining(false);
+        blockingQueue.put(new ExceptionBatchData(e));
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+        LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex);
+        Thread.currentThread().interrupt();
       }
     }
+
   }
 
   private List<ManagedSeriesReader> seriesReaderList;
@@ -141,7 +151,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * @param readers   readers in List(IPointReader) structure
    */
   public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers) throws InterruptedException {
+      List<ManagedSeriesReader> readers) throws IOException, InterruptedException {
     super(paths, dataTypes);
     this.seriesReaderList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
@@ -153,7 +163,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
     init();
   }
 
-  private void init() throws InterruptedException {
+  private void init() throws IOException, InterruptedException {
     timeHeap = new TreeSet<>();
     for (int i = 0; i < seriesReaderList.size(); i++) {
       ManagedSeriesReader reader = seriesReaderList.get(i);
@@ -177,8 +187,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * for RPC in RawData query between client and server fill time buffer, value buffers and bitmap
    * buffers
    */
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
-      throws IOException, InterruptedException {
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
     int seriesNum = seriesReaderList.size();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
 
@@ -339,14 +348,22 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
     return tsQueryDataSet;
   }
 
-  private void fillCache(int seriesIndex) throws InterruptedException {
+  private void fillCache(int seriesIndex) throws IOException, InterruptedException {
     BatchData batchData = blockingQueueArray[seriesIndex].take();
     // no more batch data in this time series queue
     if (batchData instanceof SignalBatchData) {
       noMoreDataInQueueArray[seriesIndex] = true;
-    }
-    // there are more batch data in this time series queue
-    else {
+    } else if (batchData instanceof ExceptionBatchData) {
+      // exception happened in producer thread
+      ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
+      LOGGER.error("exception happened in producer thread", exceptionBatchData.exception);
+      if (exceptionBatchData.exception instanceof IOException) {
+        throw (IOException)exceptionBatchData.exception;
+      } else if (exceptionBatchData.exception instanceof RuntimeException) {
+        throw (RuntimeException)exceptionBatchData.exception;
+      }
+
+    } else {   // there are more batch data in this time series queue
       cachedBatchDataArray[seriesIndex] = batchData;
 
       synchronized (seriesReaderList.get(seriesIndex)) {
@@ -387,7 +404,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
    * for spark/hadoop/hive integration and test
    */
   @Override
-  protected RowRecord nextWithoutConstraint() {
+  protected RowRecord nextWithoutConstraint() throws IOException {
     int seriesNum = seriesReaderList.size();
 
     long minTime = timeHeap.pollFirst();
@@ -414,6 +431,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
           } catch (InterruptedException e) {
             LOGGER.error("Interrupted while taking from the blocking queue: ", e);
             Thread.currentThread().interrupt();
+          } catch (IOException e) {
+            LOGGER.error("Got IOException", e);
+            throw e;
           }
         }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 4f9ea11..4eb2cbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.query.executor;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -41,6 +39,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * IoTDB query executor.
  */
@@ -60,7 +62,7 @@ public class RawDataQueryExecutor {
    * without filter or with global time filter.
    */
   public QueryDataSet executeWithoutValueFilter(QueryContext context)
-      throws StorageEngineException {
+          throws StorageEngineException {
 
     List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
     try {
@@ -69,6 +71,8 @@ public class RawDataQueryExecutor {
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
+    } catch (IOException e) {
+      throw new StorageEngineException(e.getMessage());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3ad965c..130dd6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -592,6 +592,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private TSExecuteStatementResp internalExecuteQueryStatement(
       long statementId, PhysicalPlan plan, int fetchSize, String username) {
     long t1 = System.currentTimeMillis();
+    long queryId = -1;
     try {
       TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
 
@@ -611,7 +612,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
       // generate the queryId for the operation
-      long queryId = generateQueryId(true);
+      queryId = generateQueryId(true);
       // put it into the corresponding Set
 
       statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
@@ -629,6 +630,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return resp;
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      if (queryId != -1) {
+        try {
+          releaseQueryResource(queryId);
+        } catch (StorageEngineException ex) {
+          logger.error("Error happened while releasing query resource: ", ex);
+        }
+      }
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
@@ -861,6 +869,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      try {
+        releaseQueryResource(req.queryId);
+      } catch (StorageEngineException ex) {
+        logger.error("Error happened while releasing query resource: ", ex);
+      }
       return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
new file mode 100644
index 0000000..0d7eb57
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
@@ -0,0 +1,16 @@
+package org.apache.iotdb.tsfile.read.common;
+
+
+public class ExceptionBatchData extends BatchData {
+
+  public Exception exception;
+
+  public ExceptionBatchData(Exception exception) {
+    this.exception = exception;
+  }
+
+  @Override
+  public boolean hasCurrent() {
+    throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
+  }
+}