You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/12 03:15:05 UTC
[incubator-iotdb] 01/01: update thrift file
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_session_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7480e606d5764f671dbc2ae0ea44b94a2a6d95dd
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Dec 12 11:14:43 2019 +0800
update thrift file
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 60 +++----
service-rpc/rpc-changelist.md | 22 +++
service-rpc/src/main/thrift/rpc.thrift | 174 +++++++++------------
3 files changed, 132 insertions(+), 124 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4eeb357..b18e38c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.service;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -90,22 +92,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
protected QueryProcessor processor;
// Record the username for every rpc connection. Username.get() is null if
// login is failed.
- protected ThreadLocal<String> username = new ThreadLocal<>();
-
- // The statementId is unique in one session for each statement.
- private ThreadLocal<Long> statementIdGenerator = new ThreadLocal<>();
- // The operationIdGenerator is unique in one session for each operation.
- private ThreadLocal<Long> operationIdGenerator = new ThreadLocal<>();
+ protected Map<Long, String> usernameMap = new ConcurrentHashMap<>();
+ private Map<Long, ZoneId> zoneIds = new ConcurrentHashMap<>();
+
+ // The sessionId is unique in one IoTDB instance.
+ private AtomicLong sessionIdGenerator = new AtomicLong();
+ // The statementId is unique in one IoTDB instance.
+ private AtomicLong statementIdGenerator = new AtomicLong();
+ // The queryIdGenerator is unique in one IoTDB instance for each operation that needs an id.
+ private AtomicLong queryIdGenerator = new AtomicLong();
// (statement -> Set(queryId))
- private ThreadLocal<Map<Long, Set<Long>>> statementId2QueryId = new ThreadLocal<>();
+ private Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
// (queryId -> PhysicalPlan)
- private ThreadLocal<Map<Long, PhysicalPlan>> queryId2Plan = new ThreadLocal<>();
+ private Map<Long, PhysicalPlan> queryId2Plan = new ConcurrentHashMap<>();
// (queryId -> QueryDataSet)
- private ThreadLocal<Map<Long, QueryDataSet>> queryId2DataSet = new ThreadLocal<>();
- private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
+ private Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+ // (queryId -> QueryContext)
+ private Map<Long, QueryContext> contextMapLocal = new ConcurrentHashMap<>();
+
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
-
+
public TSServiceImpl() {
processor = new QueryProcessor(new QueryProcessExecutor());
}
@@ -191,7 +197,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private void initForOneSession() {
queryId2Plan.set(new HashMap<>());
queryId2DataSet.set(new HashMap<>());
- operationIdGenerator.set(0L);
+ queryIdGenerator.set(0L);
statementIdGenerator.set(0L);
contextMapLocal.set(new HashMap<>());
statementId2QueryId.set(new HashMap<>());
@@ -215,8 +221,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
statementIdGenerator.remove();
}
// clear the queryId counter
- if (operationIdGenerator.get() != null) {
- operationIdGenerator.remove();
+ if (queryIdGenerator.get() != null) {
+ queryIdGenerator.remove();
}
// clear all cached physical plans of the connection
if (queryId2Plan.get() != null) {
@@ -608,16 +614,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} // else default ignoreTimeStamp is false
resp.setOperationType(plan.getOperatorType().toString());
// generate the queryId for the operation
- long queryId = generateOperationId();
+ long queryId = generateQueryId();
// put it into the corresponding Set
Set<Long> queryIdSet = statementId2QueryId.get()
.computeIfAbsent(statementId, k -> new HashSet<>());
queryIdSet.add(queryId);
- TSHandleIdentifier operationId = new TSHandleIdentifier(
+ TSHandleIdentifier queryId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()),
queryId);
- TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
+ TSOperationHandle operationHandle = new TSOperationHandle(queryId, true);
resp.setOperationHandle(operationHandle);
queryId2Plan.get().put(queryId, plan);
@@ -964,12 +970,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
status = executePlan(plan);
TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
- long queryId = generateOperationId();
- TSHandleIdentifier operationId = new TSHandleIdentifier(
+ long queryId = generateQueryId();
+ TSHandleIdentifier queryId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
ByteBuffer.wrap("PASS".getBytes()), queryId);
TSOperationHandle operationHandle;
- operationHandle = new TSOperationHandle(operationId, false);
+ operationHandle = new TSOperationHandle(queryId, false);
resp.setOperationHandle(operationHandle);
return resp;
}
@@ -1021,10 +1027,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
TSExecuteStatementResp resp = new TSExecuteStatementResp();
TSStatus tsStatus = new TSStatus(status);
resp.setStatus(tsStatus);
- TSHandleIdentifier operationId = new TSHandleIdentifier(
+ TSHandleIdentifier queryId = new TSHandleIdentifier(
ByteBuffer.wrap(username.get().getBytes()),
- ByteBuffer.wrap("PASS".getBytes()), generateOperationId());
- TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
+ ByteBuffer.wrap("PASS".getBytes()), generateQueryId());
+ TSOperationHandle operationHandle = new TSOperationHandle(queryId, false);
resp.setOperationHandle(operationHandle);
return resp;
}
@@ -1369,9 +1375,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
: getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
- private long generateOperationId() {
- long queryId = operationIdGenerator.get();
- operationIdGenerator.set(queryId + 1);
+ private long generateQueryId() {
+ long queryId = queryIdGenerator.get();
+ queryIdGenerator.set(queryId + 1);
return queryId;
}
}
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 4e722f8..53e38c1 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -84,4 +84,26 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
+# 0.10.0 (version-1) -> version-2
+Last Updated on November 12th, 2019 by Tian Jiang.
+
+
+## 1. Delete Old
+
+| Latest Changes | Related Committers |
+| ---------------------------------- | ------------------ |
+| Remove TS_SessionHandle,TSHandleIdentifier | Tian Jiang |
+
+## 2. Add New
+
+| Latest Changes | Related Committers |
+| ------------------------------------------------------------ | ---------------------------------- |
+| | |
+
+## 3. Update
+
+| Latest Changes | Related Committers |
+| ------------------------------------------------------------ | ---------------------- |
+| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds | Tian
+Jiang |
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index d7a123d..5c4fe5d 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -31,37 +31,9 @@ struct TSStatus {
3: optional string sqlState // as defined in the ISO/IEF CLIENT specification
}
-struct TSHandleIdentifier {
- // 16 byte globally unique identifier This is the public ID of the handle and can be used for reporting.
- // In current version, it is not used.
- 1: required binary guid,
-
- // 16 byte secret generated by the server and used to verify that the handle is not being hijacked by another user.
- // In current version, it is not used.
- 2: required binary secret,
-
- // unique identifier in session. This is a ID to identify a query in one session.
- 3: required i64 queryId,
-}
-
-// Client-side reference to a task running asynchronously on the server.
-struct TSOperationHandle {
- 1: required TSHandleIdentifier operationId
-
- // If hasResultSet = TRUE, then this operation
- // generates a result set that can be fetched.
- // Note that the result set may be empty.
- //
- // If hasResultSet = FALSE, then this operation
- // does not generate a result set, and calling
- // GetResultSetMetadata or FetchResults against
- // this OperationHandle will generate an error.
- 2: required bool hasResultSet
-}
-
struct TSExecuteStatementResp {
1: required TSStatus status
- 2: optional TSOperationHandle operationHandle
+ 2: optional i64 queryId
// Column names in select statement of SQL
3: optional list<string> columns
4: optional string operationType
@@ -74,21 +46,14 @@ enum TSProtocolVersion {
IOTDB_SERVICE_PROTOCOL_V1,
}
-// Client-side handle to persistent session information on the server-side.
-// In current version, it is not used.
-struct TS_SessionHandle {
- 1: required TSHandleIdentifier sessionId
-}
-
-
struct TSOpenSessionResp {
1: required TSStatus status
// The protocol version that the server is using.
2: required TSProtocolVersion serverProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1
- // Session Handle
- 3: optional TS_SessionHandle sessionHandle
+ // Session id
+ 3: optional i64 sessionId
// The configuration settings for this session.
4: optional map<string, string> configuration
@@ -103,11 +68,15 @@ struct TSOpenSessionReq {
4: optional map<string, string> configuration
}
+struct TSAuthenticatedReq {
+ 1: required i64 sessionId
+}
+
// CloseSession()
// Closes the specified session and frees any resources currently allocated to that session.
// Any open operations in that session will be canceled.
struct TSCloseSessionReq {
- 1: required TS_SessionHandle sessionHandle
+ 1: required i64 sessionId
}
// ExecuteStatement()
@@ -117,7 +86,7 @@ struct TSCloseSessionReq {
// statement has finished executing.
struct TSExecuteStatementReq {
// The session to execute the statement against
- 1: required TS_SessionHandle sessionHandle
+ 1: required i64 sessionId
// The statement to be executed (DML, DDL, SET, etc)
2: required string statement
@@ -127,57 +96,51 @@ struct TSExecuteStatementReq {
}
struct TSExecuteInsertRowInBatchResp{
- 1: required list<TSStatus> statusList
+ 1: required i64 sessionId
+ 2: required list<TSStatus> statusList
}
struct TSExecuteBatchStatementResp{
- 1: required TSStatus status
+ 1: required i64 sessionId
+ 2: required TSStatus status
// For each value in result, Statement.SUCCESS_NO_INFO represents success, Statement.EXECUTE_FAILED represents fail otherwise.
- 2: optional list<i32> result
+ 3: optional list<i32> result
}
struct TSExecuteBatchStatementReq{
// The session to execute the statement against
- 1: required TS_SessionHandle sessionHandle
+ 1: required i64 sessionId
// The statements to be executed (DML, DDL, SET, etc)
2: required list<string> statements
}
-
struct TSGetOperationStatusReq {
+ 1: required i64 sessionId
// Session to run this request against
- 1: required TSOperationHandle operationHandle
+ 2: required i64 queryId
}
// CancelOperation()
//
// Cancels processing on the specified operation handle and frees any resources which were allocated.
struct TSCancelOperationReq {
+ 1: required i64 sessionId
// Operation to cancel
- 1: required TSOperationHandle operationHandle
+ 2: required i64 queryId
}
// CloseOperation()
struct TSCloseOperationReq {
- 1: required TSOperationHandle operationHandle
+ 1: required i64 sessionId
2: required i64 queryId
- 3: optional i64 stmtId
-}
-
-struct TSQueryDataSet{
- // ByteBuffer for time column
- 1: required binary time
- // ByteBuffer for each column values
- 2: required list<binary> valueList
- // Bitmap for each column to indicate whether it is a null value
- 3: required list<binary> bitmapList
}
struct TSFetchResultsReq{
- 1: required string statement
- 2: required i32 fetchSize
- 3: required i64 queryId
+ 1: required i64 sessionId
+ 2: required string statement
+ 3: required i32 fetchSize
+ 4: required i64 queryId
}
struct TSFetchResultsResp{
@@ -201,16 +164,10 @@ struct TSFetchMetadataResp{
}
struct TSFetchMetadataReq{
- 1: required string type
- 2: optional string columnPath
- 3: optional i32 nodeLevel
-}
-
-struct TSColumnSchema{
- 1: optional string name;
- 2: optional string dataType;
- 3: optional string encoding;
- 4: optional map<string, string> otherArgs;
+ 1: required i64 sessionId
+ 2: required string type
+ 3: optional string columnPath
+ 4: optional i32 nodeLevel
}
struct TSGetTimeZoneResp {
@@ -219,52 +176,59 @@ struct TSGetTimeZoneResp {
}
struct TSSetTimeZoneReq {
- 1: required string timeZone
+ 1: required i64 sessionId
+ 2: required string timeZone
}
// for prepared statement
struct TSInsertionReq {
- 1: optional string deviceId
- 2: optional list<string> measurements
- 3: optional list<string> values
- 4: optional i64 timestamp
- 5: required i64 stmtId
+ 1: required i64 sessionId
+ 2: optional string deviceId
+ 3: optional list<string> measurements
+ 4: optional list<string> values
+ 5: optional i64 timestamp
+ 6: required i64 stmtId
}
// for session
struct TSInsertReq {
- 1: required string deviceId
- 2: required list<string> measurements
- 3: required list<string> values
- 4: required i64 timestamp
+ 1: required i64 sessionId
+ 2: required string deviceId
+ 3: required list<string> measurements
+ 4: required list<string> values
+ 5: required i64 timestamp
}
struct TSBatchInsertionReq {
- 1: required string deviceId
- 2: required list<string> measurements
- 3: required binary values
- 4: required binary timestamps
- 5: required list<i32> types
- 6: required i32 size
+ 1: required i64 sessionId
+ 2: required string deviceId
+ 3: required list<string> measurements
+ 4: required binary values
+ 5: required binary timestamps
+ 6: required list<i32> types
+ 7: required i32 size
}
struct TSInsertInBatchReq {
- 1: required list<string> deviceIds
- 2: required list<list<string>> measurementsList
- 3: required list<list<string>> valuesList
- 4: required list<i64> timestamps
+ 1: required i64 sessionId
+ 2: required list<string> deviceIds
+ 3: required list<list<string>> measurementsList
+ 4: required list<list<string>> valuesList
+ 5: required list<i64> timestamps
}
struct TSDeleteDataReq {
- 1: required list<string> paths
- 2: required i64 timestamp
+ 1: required i64 sessionId
+ 2: required list<string> paths
+ 3: required i64 timestamp
}
struct TSCreateTimeseriesReq {
- 1: required string path
- 2: required i32 dataType
- 3: required i32 encoding
- 4: required i32 compressor
+ 1: required i64 sessionId
+ 2: required string path
+ 3: required i32 dataType
+ 4: required i32 encoding
+ 5: required i32 compressor
}
struct ServerProperties {
@@ -273,6 +237,22 @@ struct ServerProperties {
3: required string timestampPrecision;
}
+struct TSQueryDataSet{
+ // ByteBuffer for time column
+ 1: required binary time
+ // ByteBuffer for each column values
+ 2: required list<binary> valueList
+ // Bitmap for each column to indicate whether it is a null value
+ 3: required list<binary> bitmapList
+}
+
+struct TSColumnSchema{
+ 1: optional string name;
+ 2: optional string dataType;
+ 3: optional string encoding;
+ 4: optional map<string, string> otherArgs;
+}
+
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);