You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/07 09:03:01 UTC
[incubator-iotdb] branch master updated: [IOTDB-291] Statement
close operation may cause the whole connection's resource to be released
(#526)
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 37e6840 [IOTDB-291] Statement close operation may cause the whole connection's resource to be released (#526)
37e6840 is described below
commit 37e684006f65390a58d49866cd1c8ecec68d0f83
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Nov 7 17:02:55 2019 +0800
[IOTDB-291] Statement close operation may cause the whole connection's resource to be released (#526)
* 1. maintain the rpc changelist; 2. change the field name
* IOTDB-291 Statement close operation may cause the whole connection's resource to be released
---
.../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 30 +--
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 25 ++-
.../test/java/org/apache/iotdb/jdbc/BatchTest.java | 29 +--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 149 +++++++++------
.../apache/iotdb/db/integration/IoTDBCloseIT.java | 202 +++++++++++++++++++++
service-rpc/rpc-changelist.md | 1 +
service-rpc/src/main/thrift/rpc.thrift | 3 +
.../java/org/apache/iotdb/session/Session.java | 44 ++---
8 files changed, 355 insertions(+), 128 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
index e54b16d..69425e8 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
@@ -18,37 +18,20 @@
*/
package org.apache.iotdb.jdbc;
+import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
import java.sql.Date;
-import java.sql.NClob;
-import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
-import java.sql.Ref;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.RowId;
-import java.sql.SQLException;
-import java.sql.SQLXML;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.sql.*;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
-import org.apache.thrift.TException;
+import java.util.*;
public class IoTDBPreparedStatement extends IoTDBStatement implements PreparedStatement {
@@ -63,12 +46,11 @@ public class IoTDBPreparedStatement extends IoTDBStatement implements PreparedSt
public IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
TS_SessionHandle sessionHandle, ZoneId zoneId) throws SQLException{
super(connection, client, sessionHandle, zoneId);
- requestStmtId();
}
public IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
TS_SessionHandle sessionHandle, String sql,
- ZoneId zoneId) {
+ ZoneId zoneId) throws SQLException {
super(connection, client, sessionHandle, zoneId);
this.sql = sql;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 956d778..1a5c7cd 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -79,18 +79,31 @@ public class IoTDBStatement implements Statement {
*/
IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
TS_SessionHandle sessionHandle,
- int fetchSize, ZoneId zoneId) {
+ int fetchSize, ZoneId zoneId) throws SQLException {
this.connection = connection;
this.client = client;
this.sessionHandle = sessionHandle;
this.fetchSize = fetchSize;
this.batchSQLList = new ArrayList<>();
this.zoneId = zoneId;
+ requestStmtId();
+ }
+
+ // only for test
+ IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
+ TS_SessionHandle sessionHandle, ZoneId zoneId, long statementId) throws SQLException {
+ this.connection = connection;
+ this.client = client;
+ this.sessionHandle = sessionHandle;
+ this.fetchSize = Config.fetchSize;
+ this.batchSQLList = new ArrayList<>();
+ this.zoneId = zoneId;
+ this.stmtId = statementId;
}
IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
TS_SessionHandle sessionHandle,
- ZoneId zoneId) {
+ ZoneId zoneId) throws SQLException {
this(connection, client, sessionHandle, Config.fetchSize, zoneId);
}
@@ -294,7 +307,7 @@ public class IoTDBStatement implements Statement {
resultSet = databaseMetadata.getColumns(Constant.CATALOG_VERSION, null, null, null);
return true;
} else {
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
TSExecuteStatementResp execResp = client.executeStatement(execReq);
operationHandle = execResp.getOperationHandle();
try {
@@ -395,7 +408,7 @@ public class IoTDBStatement implements Statement {
private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
isCancelled = false;
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
operationHandle = execResp.getOperationHandle();
try {
@@ -450,7 +463,7 @@ public class IoTDBStatement implements Statement {
}
private int executeUpdateSQL(String sql) throws TException, IoTDBSQLException {
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, stmtId);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
operationHandle = execResp.getOperationHandle();
try {
@@ -622,7 +635,7 @@ public class IoTDBStatement implements Statement {
this.sessionHandle = connection.sessionHandle;
}
- void requestStmtId() throws SQLException {
+ private void requestStmtId() throws SQLException {
try {
this.stmtId = client.requestStatementId();
} catch (TException e) {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
index 4784f0c..1566d49 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
@@ -18,18 +18,6 @@
*/
package org.apache.iotdb.jdbc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import java.sql.BatchUpdateException;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;
@@ -39,6 +27,18 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
public class BatchTest {
@Mock
@@ -47,6 +47,8 @@ public class BatchTest {
private TSIService.Iface client;
@Mock
private TS_SessionHandle sessHandle;
+ @Mock
+ private IoTDBStatement statement;
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatusType errorStatus = new TSStatusType(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
@@ -58,7 +60,8 @@ public class BatchTest {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(connection.createStatement())
- .thenReturn(new IoTDBStatement(connection, client, sessHandle, zoneID));
+ .thenReturn(new IoTDBStatement(connection, client, sessHandle, zoneID, 1L));
+
}
@After
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ba39cdc..5dd8878 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -71,8 +71,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import static org.apache.iotdb.db.conf.IoTDBConstant.*;
@@ -94,21 +92,20 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// login is failed.
protected ThreadLocal<String> username = new ThreadLocal<>();
+ // The statementId is unique in one session for each statement.
+ private ThreadLocal<Long> statementIdGenerator = new ThreadLocal<>();
// The queryId is unique in one session for each operation.
- protected ThreadLocal<AtomicLong> queryId = new ThreadLocal<>();
+ private ThreadLocal<Long> queryIdGenerator = new ThreadLocal<>();
+ // (statement -> Set(queryId))
+ private ThreadLocal<Map<Long, Set<Long>>> statementId2QueryId = new ThreadLocal<>();
// (queryId -> PhysicalPlan)
- private ThreadLocal<HashMap<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
+ private ThreadLocal<Map<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
// (queryId -> QueryDataSet)
- private ThreadLocal<HashMap<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
+ private ThreadLocal<Map<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
- private AtomicLong globalStmtId = new AtomicLong(0L);
- // (statementId) -> (statement)
- // TODO: remove unclosed statements
- private Map<Long, PhysicalPlan> idStmtMap = new ConcurrentHashMap<>();
-
public TSServiceImpl() {
processor = new QueryProcessor(new QueryProcessExecutor());
}
@@ -154,7 +151,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private void initForOneSession() {
operationStatus.set(new HashMap<>());
queryDataSets.set(new HashMap<>());
- queryId.set(new AtomicLong(0L));
+ queryIdGenerator.set(0L);
+ statementIdGenerator.set(0L);
+ contextMapLocal.set(new HashMap<>());
+ statementId2QueryId.set(new HashMap<>());
}
@Override
@@ -163,16 +163,41 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSStatus tsStatus;
if (username.get() == null) {
tsStatus = getStatus(TSStatusCode.NOT_LOGIN_ERROR);
- if (zoneIds.get() != null) {
- zoneIds.remove();
- }
} else {
tsStatus = new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
username.remove();
- if (zoneIds.get() != null) {
- zoneIds.remove();
+ }
+ if (zoneIds.get() != null) {
+ zoneIds.remove();
+ }
+ // clear the statementId counter
+ if (statementIdGenerator.get() != null)
+ statementIdGenerator.remove();
+ // clear the queryId counter
+ if (queryIdGenerator.get() != null)
+ queryIdGenerator.remove();
+ // clear all cached physical plans of the connection
+ if (operationStatus.get() != null)
+ operationStatus.remove();
+ // clear all cached ResultSets of the connection
+ if (queryDataSets.get() != null)
+ queryDataSets.remove();
+ // clear all cached query context of the connection
+ if (contextMapLocal.get() != null) {
+ try {
+ for (QueryContext context : contextMapLocal.get().values()) {
+ QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+ }
+ contextMapLocal.remove();
+ } catch (StorageEngineException e) {
+ logger.error("Error in closeSession : ", e);
+ return new TSStatus(getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, "Error in closeOperation"));
}
}
+ // clear the statementId to queryId map
+ if (statementId2QueryId.get() != null)
+ statementId2QueryId.remove();
+
return new TSStatus(tsStatus);
}
@@ -185,15 +210,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public TSStatus closeOperation(TSCloseOperationReq req) {
logger.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
try {
-
- if (req != null && req.isSetStmtId()) {
+ // statement close
+ if (req.isSetStmtId()) {
long stmtId = req.getStmtId();
- idStmtMap.remove(stmtId);
+ Set<Long> queryIdSet = statementId2QueryId.get().get(stmtId);
+ if (queryIdSet != null) {
+ for (long queryId : queryIdSet)
+ releaseQueryResource(queryId);
+ statementId2QueryId.get().remove(stmtId);
+ }
+ }
+ // ResultSet close
+ else {
+ releaseQueryResource(req.queryId);
}
- releaseQueryResource(req);
-
- clearAllStatusForCurrentRequest();
} catch (Exception e) {
logger.error("Error in closeOperation : ", e);
return new TSStatus(getStatus(TSStatusCode.CLOSE_OPERATION_ERROR, "Error in closeOperation"));
@@ -201,32 +232,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return new TSStatus(getStatus(TSStatusCode.SUCCESS_STATUS));
}
- private void releaseQueryResource(TSCloseOperationReq req) throws StorageEngineException {
- Map<Long, QueryContext> contextMap = contextMapLocal.get();
- if (contextMap == null) {
- return;
- }
- if (req == null || req.queryId == -1) {
- // end query for all the query tokens created by current thread
- for (QueryContext context : contextMap.values()) {
- QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
- }
- contextMapLocal.set(new HashMap<>());
- } else {
- QueryResourceManager.getInstance()
- .endQueryForGivenJob(contextMap.remove(req.queryId).getJobId());
- }
- }
- private void clearAllStatusForCurrentRequest() {
- if (this.queryDataSets.get() != null) {
- this.queryDataSets.get().clear();
- }
- if (this.operationStatus.get() != null) {
- this.operationStatus.get().clear();
- }
- if (this.queryId.get() != null) {
- this.queryId.get().set(0L);
+ /**
+ * release single operation resource
+ * @param queryId
+ * @throws StorageEngineException
+ */
+ private void releaseQueryResource(long queryId) throws StorageEngineException {
+
+ // remove the corresponding Physical Plan
+ if (operationStatus.get() != null)
+ operationStatus.get().remove(queryId);
+ // remove the corresponding Dataset
+ if (queryDataSets.get() != null)
+ queryDataSets.get().remove(queryId);
+ // remove the corresponding query context and query resource
+ if (contextMapLocal.get() != null && contextMapLocal.get().containsKey(queryId)) {
+ QueryResourceManager.getInstance()
+ .endQueryForGivenJob(contextMapLocal.get().remove(queryId).getJobId());
}
}
@@ -564,7 +587,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan;
physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
if (physicalPlan.isQuery()) {
- resp = executeQueryStatement(physicalPlan);
+ resp = executeQueryStatement(req.statementId, physicalPlan);
long endTime = System.currentTimeMillis();
sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
sqlArgumentsList.add(sqlArgument);
@@ -630,7 +653,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private TSExecuteStatementResp executeQueryStatement(PhysicalPlan plan) {
+ private TSExecuteStatementResp executeQueryStatement(long statementId, PhysicalPlan plan) {
long t1 = System.currentTimeMillis();
try {
TSExecuteStatementResp resp;
@@ -645,8 +668,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setIgnoreTimeStamp(true);
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
+ // generate the queryId for the operation
+ long queryId = generateQueryId();
+ // put it into the corresponding Set
+ Set<Long> queryIdSet = statementId2QueryId.get().computeIfAbsent(statementId, k -> new HashSet<>());
+ queryIdSet.add(queryId);
+
TSHandleIdentifier operationId = new TSHandleIdentifier(
- ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+ ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId);
TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
resp.setOperationHandle(operationHandle);
@@ -681,7 +710,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
"Statement is not a query statement."));
}
- return executeQueryStatement(physicalPlan);
+ return executeQueryStatement(req.statementId, physicalPlan);
}
private List<String> queryColumnsType(List<String> columns) throws PathErrorException {
@@ -952,9 +981,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
status = executePlan(plan);
TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
+ long queryId = generateQueryId();
TSHandleIdentifier operationId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+ ByteBuffer.wrap("PASS".getBytes()), queryId);
TSOperationHandle operationHandle;
operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
@@ -1014,7 +1044,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setStatus(tsStatus);
TSHandleIdentifier operationId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
+ ByteBuffer.wrap("PASS".getBytes()), generateQueryId());
TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
return resp;
@@ -1037,7 +1067,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
void handleClientExit() {
- closeOperation(null);
closeSession(null);
}
@@ -1091,7 +1120,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
long stmtId = req.getStmtId();
- InsertPlan plan = (InsertPlan) idStmtMap.computeIfAbsent(stmtId, k -> new InsertPlan());
+ InsertPlan plan = (InsertPlan) operationStatus.get().computeIfAbsent(stmtId, k -> new InsertPlan());
// the old parameter will be used if new parameter is not set
if (req.isSetDeviceId()) {
@@ -1277,7 +1306,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public long requestStatementId() {
- return globalStmtId.incrementAndGet();
+ long statementId = statementIdGenerator.get();
+ statementIdGenerator.set(statementId+1);
+ return statementId;
}
private TSStatus checkAuthority(PhysicalPlan plan) {
@@ -1314,5 +1345,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
: getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
+
+ private long generateQueryId() {
+ long queryId = queryIdGenerator.get();
+ queryIdGenerator.set(queryId+1);
+ return queryId;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java
new file mode 100644
index 0000000..08306fe
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCloseIT.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
+ * defined as integration test.
+ */
+public class IoTDBCloseIT {
+
+ private static IoTDB daemon;
+
+ private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+ private static int maxNumberOfPointsInPage;
+ private static int pageSizeInByte;
+ private static int groupSizeInByte;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
+ EnvironmentUtils.closeStatMonitor();
+
+ // use small page setting
+ // origin value
+ maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+ pageSizeInByte = tsFileConfig.getPageSizeInByte();
+ groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+ // new value
+ tsFileConfig.setMaxNumberOfPointsInPage(1000);
+ tsFileConfig.setPageSizeInByte(1024 * 150);
+ tsFileConfig.setGroupSizeInByte(1024 * 1000);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
+
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ daemon.stop();
+ // recovery value
+ tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+ tsFileConfig.setPageSizeInByte(pageSizeInByte);
+ tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ private static void insertData()
+ throws ClassNotFoundException, SQLException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : Constant.create_sql) {
+ statement.execute(sql);
+ }
+
+ statement.execute("SET STORAGE GROUP TO root.fans");
+ statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.fans.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+
+ for (int time = 1; time < 10; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 10);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d0(timestamp,s1) values(%s,%s)", time, time % 5);
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // one statement close shouldn't affect other statements in that connection
+ @Test
+ public void statementCloseTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "1,1,1",
+ "2,2,2",
+ "3,3,3",
+ "4,4,4",
+ "5,5,0",
+ "6,6,1",
+ "7,7,2",
+ "8,8,3",
+ "9,9,4"
+ };
+
+ String selectSql = "select * from root";
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ ) {
+ Statement statement1 = connection.createStatement();
+ Statement statement2 = connection.createStatement();
+ statement1.setFetchSize(10);
+ statement1.close();
+ boolean hasResultSet1 = statement1.execute(selectSql);
+ Assert.assertTrue(hasResultSet1);
+ ResultSet resultSet1 = statement1.getResultSet();
+ int cnt1 = 0;
+ while (resultSet1.next() && cnt1 < 5) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt1], builder.toString());
+ cnt1++;
+ }
+
+ statement2.setFetchSize(10);
+ boolean hasResultSet2 = statement2.execute(selectSql);
+ Assert.assertTrue(hasResultSet2);
+ ResultSet resultSet2 = statement2.getResultSet();
+ int cnt2 = 0;
+ while (resultSet2.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet2.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet2.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet2.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt2], builder.toString());
+ cnt2++;
+ }
+ Assert.assertEquals(9, cnt2);
+
+ // manually close the statement2 and this operation shouldn't affect the statement1
+ statement2.close();
+ Assert.assertTrue(statement2.isClosed());
+ Assert.assertFalse(statement1.isClosed());
+
+
+ // use do-while instead of while because in the previous while loop, we have executed the next function,
+ // and the cursor has been moved to the next position, so we should fetch that value first.
+ do {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt1], builder.toString());
+ cnt1++;
+ } while (resultSet1.next());
+ // Although the statement2 has the same sql as statement1, they shouldn't affect each other.
+ // So the statement1's ResultSet should also have 9 rows in total.
+ Assert.assertEquals(9, cnt1);
+ statement1.close();
+ Assert.assertTrue(statement1.isClosed());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 7fbf1bf..5e08140 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -74,6 +74,7 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add required i64 queryId in TSHandleIdentifier | Yuan Tian |
| Add optional set\<string> childPaths in TSFetchMetadataResp | Haonan Hou |
| Add optional string version in TSFetchMetadataResp | Genius_pig |
+| Add required i64 statementId in TSExecuteStatementReq | Yuan Tian |
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 39f299f..6df57c5 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -121,6 +121,9 @@ struct TSExecuteStatementReq {
// The statement to be executed (DML, DDL, SET, etc)
2: required string statement
+
+ // statementId
+ 3: required i64 statementId
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index f5d7d54..3893f30 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,35 +18,11 @@
*/
package org.apache.iotdb.session;
-import static org.apache.iotdb.session.Config.PATH_MATCHER;
-
-import java.sql.SQLException;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -61,6 +37,14 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.session.Config.PATH_MATCHER;
+
public class Session {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
@@ -75,8 +59,8 @@ public class Session {
private boolean isClosed = true;
private ZoneId zoneId;
private RowRecord record;
- private AtomicLong queryId = new AtomicLong(0);
private TSOperationHandle operationHandle;
+ private long statementId;
public Session(String host, int port) {
@@ -135,6 +119,8 @@ public class Session {
sessionHandle = openResp.getSessionHandle();
+ statementId = client.requestStatementId();
+
if (zoneId != null) {
setTimeZone(zoneId.toString());
} else {
@@ -357,13 +343,13 @@ public class Session {
+ "\" is not a query statement, you should use executeNonQueryStatement method instead.");
}
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
TSExecuteStatementResp execResp = client.executeStatement(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
operationHandle = execResp.getOperationHandle();
return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
- queryId.incrementAndGet(), client, operationHandle);
+ operationHandle.getOperationId().getQueryId(), client, operationHandle);
}
/**
@@ -377,7 +363,7 @@ public class Session {
+ "\" is a query statement, you should use executeQueryStatement method instead.");
}
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql);
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
operationHandle = execResp.getOperationHandle();