You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/04/02 11:58:46 UTC

[incubator-iotdb] branch unifiy_query_control updated: bug fix: multiple queries in the same statement will cause resource leak.

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch unifiy_query_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/unifiy_query_control by this push:
     new 6751e16  bug fix: multiple queries in the same statement will cause resource leak.
6751e16 is described below

commit 6751e16d81b904a17ca9eef9aa1e066ce996b802
Author: 江天 <jt...@163.com>
AuthorDate: Tue Apr 2 19:57:36 2019 +0800

    bug fix: multiple queries in the same statement will cause resource leak.
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 54 ++++++++++++++++------
 .../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java |  8 ++--
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  9 ++--
 service-rpc/src/main/thrift/rpc.thrift             |  2 +
 4 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad3c855..a68f586 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -79,6 +79,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.thrift.TException;
@@ -104,7 +105,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
   private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private QueryContext context;
+  private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
 
   public TSServiceImpl() throws IOException {
     // do nothing because there is no need
@@ -185,10 +186,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   public TSCloseOperationResp closeOperation(TSCloseOperationReq req) throws TException {
     LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
     try {
-      // end query for all the query tokens created by current thread
-      if(context != null) {
-        QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
-      }
+
+      releaseQueryResource(req);
 
       clearAllStatusForCurrentRequest();
     } catch (FileNodeManagerException e) {
@@ -197,6 +196,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return new TSCloseOperationResp(new TS_Status(TS_StatusCode.SUCCESS_STATUS));
   }
 
+  private void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
+    Map<Long, QueryContext> contextMap = contextMapLocal.get();
+    if (contextMap == null) {
+      return;
+    }
+    if(req == null || req.queryId == -1) {
+      // end query for all the query tokens created by current thread
+      for (QueryContext context : contextMap.values()) {
+        QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+      }
+    } else {
+      QueryResourceManager.getInstance()
+          .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
+    }
+  }
+
   private void clearAllStatusForCurrentRequest() {
     if (this.queryRet.get() != null) {
       this.queryRet.get().clear();
@@ -591,14 +606,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       int fetchSize = req.getFetch_size();
       QueryDataSet queryDataSet;
       if (!queryRet.get().containsKey(statement)) {
-        PhysicalPlan physicalPlan = queryStatus.get().get(statement);
-        processor.getExecutor().setFetchSize(fetchSize);
-
-        context = new QueryContext();
-        context.setJobId(QueryResourceManager.getInstance().assignJobId());
-
-        queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, context);
-        queryRet.get().put(statement, queryDataSet);
+        queryDataSet = createNewDataSet(statement, fetchSize, req);
       } else {
         queryDataSet = queryRet.get().get(statement);
       }
@@ -618,6 +626,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  private QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
+      throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
+      ProcessorException, IOException {
+    PhysicalPlan physicalPlan = queryStatus.get().get(statement);
+    processor.getExecutor().setFetchSize(fetchSize);
+
+    QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
+    Map<Long, QueryContext> contextMap = contextMapLocal.get();
+    if (contextMap == null) {
+      contextMap = new HashMap<>();
+      contextMapLocal.set(contextMap);
+    }
+    contextMap.put(req.queryId, context);
+
+    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+        context);
+    queryRet.get().put(statement, queryDataSet);
+    return queryDataSet;
+  }
+
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req)
       throws TException {
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
index e622ead..c9b2b2c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
@@ -86,6 +86,7 @@ public class IoTDBQueryResultSet implements ResultSet {
   private int rowsLimit = 0;
   // 0 means it is not constrained in sql, or the offset position has been reached
   private int rowsOffset = 0;
+  private long queryId;
 
   /*
    * Combine maxRows and the LIMIT constraints. maxRowsOrRowsLimit = 0 means that neither maxRows
@@ -107,7 +108,7 @@ public class IoTDBQueryResultSet implements ResultSet {
   public IoTDBQueryResultSet(Statement statement, List<String> columnName, TSIService.Iface client,
       TSOperationHandle operationHandle,
       String sql, String aggregations,
-      List<String> columnTypeList) throws SQLException {
+      List<String> columnTypeList, long queryId) throws SQLException {
     this.statement = statement;
     this.maxRows = statement.getMaxRows();
     this.fetchSize = statement.getFetchSize();
@@ -121,6 +122,7 @@ public class IoTDBQueryResultSet implements ResultSet {
     this.columnInfoList.add(TIMESTAMP_STR);
     this.columnInfoMap = new HashMap<>();
     this.columnInfoMap.put(TIMESTAMP_STR, 1);
+    this.queryId = queryId;
     int index = 2;
     for (String name : columnName) {
       columnInfoList.add(name);
@@ -209,7 +211,7 @@ public class IoTDBQueryResultSet implements ResultSet {
   private void closeOperationHandle() throws SQLException {
     try {
       if (operationHandle != null) {
-        TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle);
+        TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, queryId);
         TSCloseOperationResp closeResp = client.closeOperation(closeReq);
         Utils.verifySuccess(closeResp.getStatus());
       }
@@ -700,7 +702,7 @@ public class IoTDBQueryResultSet implements ResultSet {
   // the next record rule without constraints
   private boolean nextWithoutConstraints() throws SQLException {
     if ((recordItr == null || !recordItr.hasNext()) && !emptyResultSet) {
-      TSFetchResultsReq req = new TSFetchResultsReq(sql, fetchSize);
+      TSFetchResultsReq req = new TSFetchResultsReq(sql, fetchSize, queryId);
 
       try {
         TSFetchResultsResp resp = client.fetchResults(req);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 431d52e..c5e7235 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -29,6 +29,7 @@ import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -64,6 +65,7 @@ public class IoTDBStatement implements Statement {
   private TS_SessionHandle sessionHandle = null;
   private TSOperationHandle operationHandle = null;
   private List<String> batchSQLList;
+  private AtomicLong queryId = new AtomicLong(0);
   /**
    * Keep state so we can fail certain calls made after close().
    */
@@ -158,7 +160,7 @@ public class IoTDBStatement implements Statement {
   private void closeClientOperation() throws SQLException {
     try {
       if (operationHandle != null) {
-        TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle);
+        TSCloseOperationReq closeReq = new TSCloseOperationReq(operationHandle, -1);
         TSCloseOperationResp closeResp = client.closeOperation(closeReq);
         Utils.verifySuccess(closeResp.getStatus());
       }
@@ -252,7 +254,7 @@ public class IoTDBStatement implements Statement {
       if (execResp.getOperationHandle().hasResultSet) {
         resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(), client,
             operationHandle, sql, execResp.getOperationType(),
-            getColumnsType(execResp.getColumns()));
+            getColumnsType(execResp.getColumns()), queryId.getAndIncrement());
         return true;
       }
       return false;
@@ -347,7 +349,8 @@ public class IoTDBStatement implements Statement {
     operationHandle = execResp.getOperationHandle();
     Utils.verifySuccess(execResp.getStatus());
     resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(), client,
-        operationHandle, sql, execResp.getOperationType(), getColumnsType(execResp.getColumns()));
+        operationHandle, sql, execResp.getOperationType(), getColumnsType(execResp.getColumns()),
+        queryId.getAndIncrement());
     return resultSet;
   }
 
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index c603ae0..de18cfe 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -173,6 +173,7 @@ struct TSCancelOperationResp {
 // CloseOperation()
 struct TSCloseOperationReq {
   1: required TSOperationHandle operationHandle
+  2: required i64 queryId
 }
 
 struct TSCloseOperationResp {
@@ -203,6 +204,7 @@ struct TSQueryDataSet{
 struct TSFetchResultsReq{
 	1: required string statement
 	2: required i32 fetch_size
+	3: required i64 queryId
 }
 
 struct TSFetchResultsResp{