You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/03/12 03:35:03 UTC
[incubator-iotdb] 01/01: Release query resource while exception
happened in query producer thread
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TyBugFix
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7fa60f1a9896c3cee4771369013612dbac0711b5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Mar 12 11:34:38 2020 +0800
Release query resource while exception happened in query producer thread
---
.../iotdb/db/engine/cache/TsFileMetaDataCache.java | 9 ++--
.../engine/storagegroup/StorageGroupProcessor.java | 51 +++++++++-------------
.../dataset/RawQueryDataSetWithoutValueFilter.java | 46 +++++++++++++------
.../db/query/executor/RawDataQueryExecutor.java | 10 +++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 15 ++++++-
.../tsfile/read/common/ExceptionBatchData.java | 16 +++++++
6 files changed, 96 insertions(+), 51 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
index fd93b32..d5301e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.engine.cache;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +25,9 @@ import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This class is used to cache <code>TsFileMetaData</code> of tsfile in IoTDB.
*/
@@ -103,10 +104,10 @@ public class TsFileMetaDataCache {
}
synchronized (internPath) {
synchronized (cache) {
- if (cache.containsKey(path)) {
+ if (cache.containsKey(tsFileResource)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
- return cache.get(path);
+ return cache.get(tsFileResource);
}
}
printCacheLog(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 16bfb4a..72e05b1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,28 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -62,14 +40,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
-import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -99,6 +73,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
@@ -1690,9 +1675,15 @@ public class StorageGroupProcessor {
if (resource.getHistoricalVersions().containsAll(seqFile.getHistoricalVersions())
&& !resource.getHistoricalVersions().equals(seqFile.getHistoricalVersions())
&& seqFile.getWriteQueryLock().writeLock().tryLock()) {
- iterator.remove();
- seqFile.remove();
- seqFile.getWriteQueryLock().writeLock().unlock();
+ try {
+ iterator.remove();
+ seqFile.remove();
+ } catch (Exception e) {
+ logger.error("Something gets wrong while removing FullyOverlapFiles ", e);
+ throw e;
+ } finally {
+ seqFile.getWriteQueryLock().writeLock().unlock();
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 1d5734a..9273c87 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -95,14 +95,24 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
Thread.currentThread().interrupt();
reader.setHasRemaining(false);
} catch (IOException e) {
- LOGGER.error(String
- .format("Something gets wrong while reading from the series reader %s: ", pathName), e);
- reader.setHasRemaining(false);
+ putExceptionBatchData(e, String.format("Something gets wrong while reading from the series reader %s: ", pathName));
} catch (Exception e) {
- LOGGER.error("Something gets wrong: ", e);
+ putExceptionBatchData(e, "Something gets wrong: ");
+ }
+ }
+
+ private void putExceptionBatchData(Exception e, String logMessage) {
+ try {
+ LOGGER.error(logMessage, e);
reader.setHasRemaining(false);
+ blockingQueue.put(new ExceptionBatchData(e));
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ LOGGER.error("Interrupted while putting ExceptionBatchData into the blocking queue: ", ex);
+ Thread.currentThread().interrupt();
}
}
+
}
private List<ManagedSeriesReader> seriesReaderList;
@@ -141,7 +151,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
* @param readers readers in List(IPointReader) structure
*/
public RawQueryDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes,
- List<ManagedSeriesReader> readers) throws InterruptedException {
+ List<ManagedSeriesReader> readers) throws IOException, InterruptedException {
super(paths, dataTypes);
this.seriesReaderList = readers;
blockingQueueArray = new BlockingQueue[readers.size()];
@@ -153,7 +163,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
init();
}
- private void init() throws InterruptedException {
+ private void init() throws IOException, InterruptedException {
timeHeap = new TreeSet<>();
for (int i = 0; i < seriesReaderList.size(); i++) {
ManagedSeriesReader reader = seriesReaderList.get(i);
@@ -177,8 +187,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
* for RPC in RawData query between client and server fill time buffer, value buffers and bitmap
* buffers
*/
- public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
- throws IOException, InterruptedException {
+ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
int seriesNum = seriesReaderList.size();
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
@@ -339,14 +348,22 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
return tsQueryDataSet;
}
- private void fillCache(int seriesIndex) throws InterruptedException {
+ private void fillCache(int seriesIndex) throws IOException, InterruptedException {
BatchData batchData = blockingQueueArray[seriesIndex].take();
// no more batch data in this time series queue
if (batchData instanceof SignalBatchData) {
noMoreDataInQueueArray[seriesIndex] = true;
- }
- // there are more batch data in this time series queue
- else {
+ } else if (batchData instanceof ExceptionBatchData) {
+ // exception happened in producer thread
+ ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
+ LOGGER.error("exception happened in producer thread", exceptionBatchData.exception);
+ if (exceptionBatchData.exception instanceof IOException) {
+ throw (IOException)exceptionBatchData.exception;
+ } else if (exceptionBatchData.exception instanceof RuntimeException) {
+ throw (RuntimeException)exceptionBatchData.exception;
+ }
+
+ } else { // there are more batch data in this time series queue
cachedBatchDataArray[seriesIndex] = batchData;
synchronized (seriesReaderList.get(seriesIndex)) {
@@ -387,7 +404,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
* for spark/hadoop/hive integration and test
*/
@Override
- protected RowRecord nextWithoutConstraint() {
+ protected RowRecord nextWithoutConstraint() throws IOException {
int seriesNum = seriesReaderList.size();
long minTime = timeHeap.pollFirst();
@@ -414,6 +431,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet {
} catch (InterruptedException e) {
LOGGER.error("Interrupted while taking from the blocking queue: ", e);
Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOGGER.error("Got IOException", e);
+ throw e;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 4f9ea11..4eb2cbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.db.query.executor;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -41,6 +39,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* IoTDB query executor.
*/
@@ -60,7 +62,7 @@ public class RawDataQueryExecutor {
* without filter or with global time filter.
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
- throws StorageEngineException {
+ throws StorageEngineException {
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
try {
@@ -69,6 +71,8 @@ public class RawDataQueryExecutor {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException(e.getMessage());
+ } catch (IOException e) {
+ throw new StorageEngineException(e.getMessage());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3ad965c..130dd6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -592,6 +592,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private TSExecuteStatementResp internalExecuteQueryStatement(
long statementId, PhysicalPlan plan, int fetchSize, String username) {
long t1 = System.currentTimeMillis();
+ long queryId = -1;
try {
TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
@@ -611,7 +612,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
// generate the queryId for the operation
- long queryId = generateQueryId(true);
+ queryId = generateQueryId(true);
// put it into the corresponding Set
statementId2QueryId.computeIfAbsent(statementId, k -> new HashSet<>()).add(queryId);
@@ -629,6 +630,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ if (queryId != -1) {
+ try {
+ releaseQueryResource(queryId);
+ } catch (StorageEngineException ex) {
+ logger.error("Error happened while releasing query resource: ", ex);
+ }
+ }
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
@@ -861,6 +869,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ try {
+ releaseQueryResource(req.queryId);
+ } catch (StorageEngineException ex) {
+ logger.error("Error happened while releasing query resource: ", ex);
+ }
return RpcUtils.getTSFetchResultsResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
new file mode 100644
index 0000000..0d7eb57
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
@@ -0,0 +1,16 @@
+package org.apache.iotdb.tsfile.read.common;
+
+
+public class ExceptionBatchData extends BatchData {
+
+ public Exception exception;
+
+ public ExceptionBatchData(Exception exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public boolean hasCurrent() {
+ throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
+ }
+}