You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/06 06:19:08 UTC
[incubator-iotdb] branch master updated: IOTDB-290 Bug about
threadlocal field in TSServiceImpl.java (#522)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d78790 IOTDB-290 Bug about threadlocal field in TSServiceImpl.java (#522)
7d78790 is described below
commit 7d787906d462542a5719c3b961fadd7613141f37
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Wed Nov 6 14:19:02 2019 +0800
IOTDB-290 Bug about threadlocal field in TSServiceImpl.java (#522)
* queryId threadlocal sessionid
---
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 36 ++--
.../apache/iotdb/jdbc/IoTDBQueryResultSetTest.java | 42 ++---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 147 ++++++----------
.../iotdb/db/integration/IoTDBAggregationIT.java | 23 +--
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 13 +-
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 13 +-
.../db/integration/IoTDBMultiStatementsIT.java | 193 +++++++++++++++++++++
.../iotdb/db/integration/IoTDBQueryDemoIT.java | 13 +-
service-rpc/rpc-changelist.md | 1 +
service-rpc/src/main/thrift/rpc.thrift | 3 +
10 files changed, 294 insertions(+), 190 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index ae8afe0..956d778 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -19,32 +19,17 @@
package org.apache.iotdb.jdbc;
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;
+import java.sql.*;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
public class IoTDBStatement implements Statement {
private static final String SHOW_TIMESERIES_COMMAND_LOWERCASE = "show timeseries";
@@ -65,7 +50,6 @@ public class IoTDBStatement implements Statement {
private TS_SessionHandle sessionHandle;
private TSOperationHandle operationHandle = null;
private List<String> batchSQLList;
- private AtomicLong queryId = new AtomicLong(0);
/**
* Keep state so we can fail certain calls made after close().
*/
@@ -319,9 +303,9 @@ public class IoTDBStatement implements Statement {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
if (execResp.getOperationHandle().hasResultSet) {
- this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
- execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, operationHandle, sql,
- queryId.getAndIncrement());
+ this.resultSet = new IoTDBQueryResultSet(this,
+ execResp.getColumns(), execResp.getDataTypeList(),
+ execResp.ignoreTimeStamp, client, operationHandle, sql, operationHandle.getOperationId().getQueryId());
return true;
}
return false;
@@ -421,7 +405,7 @@ public class IoTDBStatement implements Statement {
}
this.resultSet = new IoTDBQueryResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.ignoreTimeStamp, client, operationHandle, sql,
- queryId.getAndIncrement());
+ operationHandle.getOperationId().getQueryId());
return resultSet;
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
index 0235ce7..5bef7f0 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBQueryResultSetTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.iotdb.jdbc;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -35,26 +38,9 @@ import java.sql.Types;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
/*
This class is designed to test the function of TsfileQueryResultSet.
@@ -109,6 +95,8 @@ public class IoTDBQueryResultSetTest {
@Mock
TSOperationHandle operationHandle;
@Mock
+ TSHandleIdentifier handleIdentifier;
+ @Mock
private IoTDBConnection connection;
@Mock
private TSIService.Iface client;
@@ -135,7 +123,11 @@ public class IoTDBQueryResultSetTest {
when(connection.isClosed()).thenReturn(false);
when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execResp);
operationHandle.hasResultSet = true;
+ operationHandle.operationId = handleIdentifier;
+ handleIdentifier.queryId = 1L;
when(execResp.getOperationHandle()).thenReturn(operationHandle);
+ when(operationHandle.getOperationId()).thenReturn(handleIdentifier);
+ when(handleIdentifier.getQueryId()).thenReturn(1L);
when(execResp.getStatus()).thenReturn(Status_SUCCESS);
when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0d77ac9..ba39cdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,28 +18,6 @@
*/
package org.apache.iotdb.db.service;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -53,14 +31,7 @@ import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
-import org.apache.iotdb.db.exception.MetadataErrorException;
-import org.apache.iotdb.db.exception.NotStorageGroupException;
-import org.apache.iotdb.db.exception.OutOfTTLException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.QueryInBatchStmtException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.StorageGroupException;
+import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metrics.server.SqlArgument;
@@ -73,47 +44,14 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
-import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
+import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -127,6 +65,18 @@ import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
/**
* Thrift RPC implementation at server side.
*/
@@ -143,8 +93,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// Record the username for every rpc connection. Username.get() is null if
// login is failed.
protected ThreadLocal<String> username = new ThreadLocal<>();
- private ThreadLocal<HashMap<String, PhysicalPlan>> queryStatus = new ThreadLocal<>();
- private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>();
+
+ // The queryId is unique in one session for each operation.
+ protected ThreadLocal<AtomicLong> queryId = new ThreadLocal<>();
+ // (queryId -> PhysicalPlan)
+ private ThreadLocal<HashMap<Long, PhysicalPlan>> operationStatus = new ThreadLocal<>();
+ // (queryId -> QueryDataSet)
+ private ThreadLocal<HashMap<Long, QueryDataSet>> queryDataSets = new ThreadLocal<>();
private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
@@ -189,7 +144,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
resp.setSessionHandle(
new TS_SessionHandle(new TSHandleIdentifier(ByteBuffer.wrap(req.getUsername().getBytes()),
- ByteBuffer.wrap(req.getPassword().getBytes()))));
+ ByteBuffer.wrap(req.getPassword().getBytes()), -1L)));
logger.info("{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME,
tsStatus.getStatusType().getMessage(), req.getUsername());
@@ -197,8 +152,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private void initForOneSession() {
- queryStatus.set(new HashMap<>());
- queryRet.set(new HashMap<>());
+ operationStatus.set(new HashMap<>());
+ queryDataSets.set(new HashMap<>());
+ queryId.set(new AtomicLong(0L));
}
@Override
@@ -263,11 +219,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private void clearAllStatusForCurrentRequest() {
- if (this.queryRet.get() != null) {
- this.queryRet.get().clear();
+ if (this.queryDataSets.get() != null) {
+ this.queryDataSets.get().clear();
}
- if (this.queryStatus.get() != null) {
- this.queryStatus.get().clear();
+ if (this.operationStatus.get() != null) {
+ this.operationStatus.get().clear();
+ }
+ if (this.queryId.get() != null) {
+ this.queryId.get().set(0L);
}
}
@@ -605,7 +564,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan;
physicalPlan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get());
if (physicalPlan.isQuery()) {
- resp = executeQueryStatement(statement, physicalPlan);
+ resp = executeQueryStatement(physicalPlan);
long endTime = System.currentTimeMillis();
sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
sqlArgumentsList.add(sqlArgument);
@@ -671,7 +630,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private TSExecuteStatementResp executeQueryStatement(String statement, PhysicalPlan plan) {
+ private TSExecuteStatementResp executeQueryStatement(PhysicalPlan plan) {
long t1 = System.currentTimeMillis();
try {
TSExecuteStatementResp resp;
@@ -687,11 +646,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
TSHandleIdentifier operationId = new TSHandleIdentifier(
- ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()));
+ ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
resp.setOperationHandle(operationHandle);
- recordANewQuery(statement, plan);
+ recordANewQuery(operationId.queryId, plan);
return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
@@ -722,7 +681,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
"Statement is not a query statement."));
}
- return executeQueryStatement(statement, physicalPlan);
+ return executeQueryStatement(physicalPlan);
}
private List<String> queryColumnsType(List<String> columns) throws PathErrorException {
@@ -893,17 +852,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSFetchResultsResp(getStatus(TSStatusCode.NOT_LOGIN_ERROR));
}
- String statement = req.getStatement();
- if (!queryStatus.get().containsKey(statement)) {
+ long queryId = req.queryId;
+ if (!operationStatus.get().containsKey(queryId)) {
return getTSFetchResultsResp(
getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed statement"));
}
QueryDataSet queryDataSet;
- if (!queryRet.get().containsKey(statement)) {
- queryDataSet = createNewDataSet(statement, req);
+ if (!queryDataSets.get().containsKey(queryId)) {
+ queryDataSet = createNewDataSet(queryId, req);
} else {
- queryDataSet = queryRet.get().get(statement);
+ queryDataSet = queryDataSets.get().get(queryId);
}
int fetchSize = req.getFetch_size();
@@ -928,8 +887,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
result = QueryDataSetUtils.convertQueryDataSetByFetchSize(queryDataSet, fetchSize);
}
boolean hasResultSet = (result.getRowCount() != 0);
- if (!hasResultSet && queryRet.get() != null) {
- queryRet.get().remove(statement);
+ if (!hasResultSet && queryDataSets.get() != null) {
+ queryDataSets.get().remove(queryId);
}
TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
@@ -943,10 +902,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
- private QueryDataSet createNewDataSet(String statement, TSFetchResultsReq req)
+ private QueryDataSet createNewDataSet(long queryId, TSFetchResultsReq req)
throws PathErrorException, QueryFilterOptimizationException, StorageEngineException,
ProcessorException, IOException {
- PhysicalPlan physicalPlan = queryStatus.get().get(statement);
+ PhysicalPlan physicalPlan = operationStatus.get().get(queryId);
QueryDataSet queryDataSet;
QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
@@ -957,7 +916,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
queryDataSet = processor.getExecutor().processQuery(physicalPlan,
context);
- queryRet.get().put(statement, queryDataSet);
+ queryDataSets.get().put(req.queryId, queryDataSet);
return queryDataSet;
}
@@ -995,7 +954,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
TSHandleIdentifier operationId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap("PASS".getBytes()));
+ ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
TSOperationHandle operationHandle;
operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
@@ -1028,10 +987,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return executeUpdateStatement(physicalPlan);
}
- private void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
- queryStatus.get().put(statement, physicalPlan);
- // refresh current queryRet for statement
- queryRet.get().remove(statement);
+ private void recordANewQuery(long queryId, PhysicalPlan physicalPlan) {
+ operationStatus.get().put(queryId, physicalPlan);
}
/**
@@ -1057,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setStatus(tsStatus);
TSHandleIdentifier operationId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap("PASS".getBytes()));
+ ByteBuffer.wrap("PASS".getBytes()), queryId.get().getAndIncrement());
TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
resp.setOperationHandle(operationHandle);
return resp;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index b754722..f9e0efa 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -19,23 +19,6 @@
package org.apache.iotdb.db.integration;
-import static org.apache.iotdb.db.integration.Constant.avg;
-import static org.apache.iotdb.db.integration.Constant.count;
-import static org.apache.iotdb.db.integration.Constant.first;
-import static org.apache.iotdb.db.integration.Constant.last;
-import static org.apache.iotdb.db.integration.Constant.max_time;
-import static org.apache.iotdb.db.integration.Constant.max_value;
-import static org.apache.iotdb.db.integration.Constant.min_time;
-import static org.apache.iotdb.db.integration.Constant.min_value;
-import static org.apache.iotdb.db.integration.Constant.sum;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Locale;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -44,6 +27,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.*;
+import java.util.Locale;
+
+import static org.apache.iotdb.db.integration.Constant.*;
+import static org.junit.Assert.fail;
+
public class IoTDBAggregationIT {
private static IoTDB daemon;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index c1da2e7..0c4203c 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -38,6 +29,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
* defined as integration test.
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
index 9c659b0..e98f2a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -38,6 +29,10 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
* defined as integration test.
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
new file mode 100644
index 0000000..7e39ae4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
+ * defined as integration test.
+ */
+public class IoTDBMultiStatementsIT {
+
+ private static IoTDB daemon;
+
+ private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+ private static int maxNumberOfPointsInPage;
+ private static int pageSizeInByte;
+ private static int groupSizeInByte;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
+ EnvironmentUtils.closeStatMonitor();
+
+ // use small page setting
+ // origin value
+ maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+ pageSizeInByte = tsFileConfig.getPageSizeInByte();
+ groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+ // new value
+ tsFileConfig.setMaxNumberOfPointsInPage(1000);
+ tsFileConfig.setPageSizeInByte(1024 * 150);
+ tsFileConfig.setGroupSizeInByte(1024 * 1000);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000);
+
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ daemon.stop();
+ // recovery value
+ tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+ tsFileConfig.setPageSizeInByte(pageSizeInByte);
+ tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ private static void insertData()
+ throws ClassNotFoundException, SQLException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : Constant.create_sql) {
+ statement.execute(sql);
+ }
+
+ statement.execute("SET STORAGE GROUP TO root.fans");
+ statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
+ statement.execute("CREATE TIMESERIES root.fans.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE");
+
+ for (int time = 1; time < 10; time++) {
+
+ String sql = String
+ .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 10);
+ statement.execute(sql);
+ sql = String
+ .format("insert into root.fans.d0(timestamp,s1) values(%s,%s)", time, time % 5);
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // "select * from root.vehicle" : test select wild data
+ @Test
+ public void selectAllTest() throws ClassNotFoundException {
+ String[] retArray = new String[]{
+ "1,1,1",
+ "2,2,2",
+ "3,3,3",
+ "4,4,4",
+ "5,5,0",
+ "6,6,1",
+ "7,7,2",
+ "8,8,3",
+ "9,9,4"
+ };
+
+ String selectSql = "select * from root";
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement1 = connection.createStatement();
+ Statement statement2 = connection.createStatement()) {
+ statement1.setFetchSize(10);
+ statement1.close();
+ boolean hasResultSet1 = statement1.execute(selectSql);
+ Assert.assertTrue(hasResultSet1);
+ ResultSet resultSet1 = statement1.getResultSet();
+ int cnt1 = 0;
+ while (resultSet1.next() && cnt1 < 5) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt1], builder.toString());
+ cnt1++;
+ }
+
+ statement2.setFetchSize(10);
+ boolean hasResultSet2 = statement2.execute(selectSql);
+ Assert.assertTrue(hasResultSet2);
+ ResultSet resultSet2 = statement2.getResultSet();
+ int cnt2 = 0;
+ while (resultSet2.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet2.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet2.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet2.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt2], builder.toString());
+ cnt2++;
+ }
+ Assert.assertEquals(9, cnt2);
+
+ // use do-while instead of while because in the previous while loop, we have executed the next function,
+ // and the cursor has been moved to the next position, so we should fetch that value first.
+ do {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet1.getString(Constant.TIMESTAMP_STR))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s0"))
+ .append(",")
+ .append(resultSet1.getString("root.fans.d0.s1"));
+ Assert.assertEquals(retArray[cnt1], builder.toString());
+ cnt1++;
+ } while (resultSet1.next());
+ // Although the statement2 has the same sql as statement1, they shouldn't affect each other.
+ // So the statement1's ResultSet should also have 9 rows in total.
+ Assert.assertEquals(9, cnt1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
index e44fb3b..d334485 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryDemoIT.java
@@ -18,15 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -35,6 +26,10 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.sql.*;
+
+import static org.junit.Assert.fail;
+
public class IoTDBQueryDemoIT {
private static IoTDB daemon;
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 0b4f895..7fbf1bf 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -71,6 +71,7 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Rename some fields in TSFetchMetadataResp: ~~ColumnsList~~ to columnsList, ~~showTimeseriesList~~ to timeseriesList, ~~showStorageGroups~~ to storageGroups | Zesong Sun |
| Change struct TSQueryDataSet to eliminate row-wise rpc writing | Lei Rui |
| Add optional i32 timeseriesNum in TSFetchMetadataResp | Jack Tsai |
+| Add required i64 queryId in TSHandleIdentifier | Yuan Tian |
| Add optional set\<string> childPaths in TSFetchMetadataResp | Haonan Hou |
| Add optional string version in TSFetchMetadataResp | Genius_pig |
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 8a630a6..39f299f 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -39,6 +39,9 @@ struct TSHandleIdentifier {
// 16 byte secret generated by the server and used to verify that the handle is not being hijacked by another user.
// In current version, it is not used.
2: required binary secret,
+
+ // unique identifier in session. This is a ID to identify a query in one session.
+ 3: required i64 queryId,
}
// Client-side reference to a task running asynchronously on the server.