You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/04/02 11:58:46 UTC
[incubator-iotdb] branch unifiy_query_control updated: bug fix:
multiple queries in the same statement will cause resource leak.
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch unifiy_query_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/unifiy_query_control by this push:
new 6751e16 bug fix: multiple queries in the same statement will cause resource leak.
6751e16 is described below
commit 6751e16d81b904a17ca9eef9aa1e066ce996b802
Author: 江天 <jt...@163.com>
AuthorDate: Tue Apr 2 19:57:36 2019 +0800
bug fix: multiple queries in the same statement will cause resource leak.
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 54 ++++++++++++++++------
.../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 8 ++--
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 9 ++--
service-rpc/src/main/thrift/rpc.thrift | 2 +
4 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad3c855..a68f586 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -79,6 +79,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -104,7 +105,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private QueryContext context;
+ private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
public TSServiceImpl() throws IOException {
// do nothing because there is no need
@@ -185,10 +186,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public TSCloseOperationResp closeOperation(TSCloseOperationReq req) throws TException {
LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
try {
- // end query for all the query tokens created by current thread
- if(context != null) {
- QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
- }
+
+ releaseQueryResource(req);
clearAllStatusForCurrentRequest();
} catch (FileNodeManagerException e) {
@@ -197,6 +196,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return new TSCloseOperationResp(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
}
+ private void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
+ Map<Long, QueryContext> contextMap = contextMapLocal.get();
+ if (contextMap == null) {
+ return;
+ }
+ if(req == null || req.queryId == -1) {
+ // end query for all the query tokens created by current thread
+ for (QueryContext context : contextMap.values()) {
+ QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+ }
+ } else {
+ QueryResourceManager.getInstance()
+ .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
+ }
+ }
+
private void clearAllStatusForCurrentRequest() {
if (this.queryRet.get() != null) {
this.queryRet.get().clear();
@@ -591,14 +606,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
int fetchSize = req.getFetch_size();
QueryDataSet queryDataSet;
if (!queryRet.get().containsKey(statement)) {
- PhysicalPlan physicalPlan = queryStatus.get().get(statement);
- processor.getExecutor().setFetchSize(fetchSize);
-
- context = new QueryContext();
- context.setJobId(QueryResourceManager.getInstance().assignJobId());
-
- queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, context);
- queryRet.get().put(statement, queryDataSet);
+ queryDataSet = createNewDataSet(statement, fetchSize, req);
} else {
queryDataSet = queryRet.get().get(statement);
}
@@ -618,6 +626,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ private QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
+ throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
+ ProcessorException, IOException {
+ PhysicalPlan physicalPlan = queryStatus.get().get(statement);
+ processor.getExecutor().setFetchSize(fetchSize);
+
+ QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
+ Map<Long, QueryContext> contextMap = contextMapLocal.get();
+ if (contextMap == null) {
+ contextMap = new HashMap<>();
+ contextMapLocal.set(contextMap);
+ }
+ contextMap.put(req.queryId, context);
+
+ QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+ context);
+ queryRet.get().put(statement, queryDataSet);
+ return queryDataSet;
+ }
+
@Override
public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
throws TException {
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
index e622ead..c9b2b2c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
@@ -86,6 +86,7 @@ public class IoTDBQueryResultSet implements ResultSet {
private int rowsLimit = 0;
// 0 means it is not constrained in sql, or the offset position has been reached
private int rowsOffset = 0;
+ private long queryId;
/*
* Combine maxRows and the LIMIT constraints. maxRowsOrRowsLimit = 0 means that neither maxRows
@@ -107,7 +108,7 @@ public class IoTDBQueryResultSet implements ResultSet {
public IoTDBQueryResultSet(Statement statement, List<String> columnName, TSIService.Iface client,
TSOperationHandle operationHandle,
String sql, String aggregations,
- List<String> columnTypeList) throws SQLException {
+ List<String> columnTypeList, long queryId) throws SQLException {
this.statement = statement;
this.maxRows = statement.getMaxRows();
this.fetchSize = statement.getFetchSize();
@@ -121,6 +122,7 @@ public class IoTDBQueryResultSet implements ResultSet {
this.columnInfoList.add(TIMESTAMP_STR);
this.columnInfoMap = new HashMap<>();
this.columnInfoMap.put(TIMESTAMP_STR, 1);
+ this.queryId = queryId;
int index = 2;
for (String name : columnName) {
columnInfoList.add(name);
@@ -209,7 +211,7 @@ public class IoTDBQueryResultSet implements ResultSet {
private void closeOperationHandle() throws SQLException {
try {
if (operationHandle != null) {
- TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle);
+ TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, queryId);
TSCloseOperationResp closeResp = client.closeOperation(closeReq);
Utils.verifySuccess(closeResp.getStatus());
}
@@ -700,7 +702,7 @@ public class IoTDBQueryResultSet implements ResultSet {
// the next record rule without constraints
private boolean nextWithoutConstraints() throws SQLException {
if ((recordItr == null || !recordItr.hasNext()) && !emptyResultSet) {
- TSFetchResultsReq req = new TSFetchResultsReq(sql, fetchSize);
+ TSFetchResultsReq req = new TSFetchResultsReq(sql, fetchSize, queryId);
try {
TSFetchResultsResp resp = client.fetchResults(req);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 431d52e..c5e7235 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -29,6 +29,7 @@ import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationResp;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -64,6 +65,7 @@ public class IoTDBStatement implements Statement {
private TS_SessionHandle sessionHandle = null;
private TSOperationHandle operationHandle = null;
private List<String> batchSQLList;
+ private AtomicLong queryId = new AtomicLong(0);
/**
* Keep state so we can fail certain calls made after close().
*/
@@ -158,7 +160,7 @@ public class IoTDBStatement implements Statement {
private void closeClientOperation() throws SQLException {
try {
if (operationHandle != null) {
- TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle);
+ TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, -1);
TSCloseOperationResp closeResp = client.closeOperation(closeReq);
Utils.verifySuccess(closeResp.getStatus());
}
@@ -252,7 +254,7 @@ public class IoTDBStatement implements Statement {
if (execResp.getOperationHandle().hasResultSet) {
resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(), client,
operationHandle, sql, execResp.getOperationType(),
- getColumnsType(execResp.getColumns()));
+ getColumnsType(execResp.getColumns()), queryId.getAndIncrement());
return true;
}
return false;
@@ -347,7 +349,8 @@ public class IoTDBStatement implements Statement {
operationHandle = execResp.getOperationHandle();
Utils.verifySuccess(execResp.getStatus());
resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(), client,
- operationHandle, sql, execResp.getOperationType(), getColumnsType(execResp.getColumns()));
+ operationHandle, sql, execResp.getOperationType(), getColumnsType(execResp.getColumns()),
+ queryId.getAndIncrement());
return resultSet;
}
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index c603ae0..de18cfe 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -173,6 +173,7 @@ struct TSCancelOperationResp {
// CloseOperation()
struct TSCloseOperationReq {
1: required TSOperationHandle operationHandle
+ 2: required i64 queryId
}
struct TSCloseOperationResp {
@@ -203,6 +204,7 @@ struct TSQueryDataSet{
struct TSFetchResultsReq{
1: required string statement
2: required i32 fetch_size
+ 3: required i64 queryId
}
struct TSFetchResultsResp{