You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/12 03:15:05 UTC

[incubator-iotdb] 01/01: update thrift file

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_session_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7480e606d5764f671dbc2ae0ea44b94a2a6d95dd
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Dec 12 11:14:43 2019 +0800

    update thrift file
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  60 +++----
 service-rpc/rpc-changelist.md                      |  22 +++
 service-rpc/src/main/thrift/rpc.thrift             | 174 +++++++++------------
 3 files changed, 132 insertions(+), 124 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4eeb357..b18e38c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.service;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -90,22 +92,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
-  protected ThreadLocal<String> username = new ThreadLocal<>();
-
-  // The statementId is unique in one session for each statement.
-  private ThreadLocal<Long> statementIdGenerator = new ThreadLocal<>();
-  // The operationIdGenerator is unique in one session for each operation.
-  private ThreadLocal<Long> operationIdGenerator = new ThreadLocal<>();
+  protected Map<Long, String> usernameMap = new ConcurrentHashMap<>();
+  private Map<Long, ZoneId> zoneIds = new ConcurrentHashMap<>();
+  
+  // The sessionId is unique in one IoTDB instance.
+  private AtomicLong sessionIdGenerator = new AtomicLong();
+  // The statementId is unique in one IoTDB instance.
+  private AtomicLong statementIdGenerator = new AtomicLong();
+  // The queryIdGenerator is unique in one IoTDB instance for each operation that needs an id.
+  private AtomicLong queryIdGenerator = new AtomicLong();
   // (statement -> Set(queryId))
-  private ThreadLocal<Map<Long, Set<Long>>> statementId2QueryId = new ThreadLocal<>();
+  private Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
   // (queryId -> PhysicalPlan)
-  private ThreadLocal<Map<Long, PhysicalPlan>> queryId2Plan = new ThreadLocal<>();
+  private Map<Long, PhysicalPlan> queryId2Plan = new ConcurrentHashMap<>();
   // (queryId -> QueryDataSet)
-  private ThreadLocal<Map<Long, QueryDataSet>> queryId2DataSet = new ThreadLocal<>();
-  private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
+  private Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+  // (queryId -> QueryContext)
+  private Map<Long, QueryContext> contextMapLocal = new ConcurrentHashMap<>();
+  
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
-
+  
   public TSServiceImpl() {
     processor = new QueryProcessor(new QueryProcessExecutor());
   }
@@ -191,7 +197,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private void initForOneSession() {
     queryId2Plan.set(new HashMap<>());
     queryId2DataSet.set(new HashMap<>());
-    operationIdGenerator.set(0L);
+    queryIdGenerator.set(0L);
     statementIdGenerator.set(0L);
     contextMapLocal.set(new HashMap<>());
     statementId2QueryId.set(new HashMap<>());
@@ -215,8 +221,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       statementIdGenerator.remove();
     }
     // clear the queryId counter
-    if (operationIdGenerator.get() != null) {
-      operationIdGenerator.remove();
+    if (queryIdGenerator.get() != null) {
+      queryIdGenerator.remove();
     }
     // clear all cached physical plans of the connection
     if (queryId2Plan.get() != null) {
@@ -608,16 +614,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
       // generate the queryId for the operation
-      long queryId = generateOperationId();
+      long queryId = generateQueryId();
       // put it into the corresponding Set
       Set<Long> queryIdSet = statementId2QueryId.get()
           .computeIfAbsent(statementId, k -> new HashSet<>());
       queryIdSet.add(queryId);
 
-      TSHandleIdentifier operationId = new TSHandleIdentifier(
+      TSHandleIdentifier queryId = new TSHandleIdentifier(
           ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()),
           queryId);
-      TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
+      TSOperationHandle operationHandle = new TSOperationHandle(queryId, true);
       resp.setOperationHandle(operationHandle);
 
       queryId2Plan.get().put(queryId, plan);
@@ -964,12 +970,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     status = executePlan(plan);
     TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
-    long queryId = generateOperationId();
-    TSHandleIdentifier operationId = new TSHandleIdentifier(
+    long queryId = generateQueryId();
+    TSHandleIdentifier queryId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
         ByteBuffer.wrap("PASS".getBytes()), queryId);
     TSOperationHandle operationHandle;
-    operationHandle = new TSOperationHandle(operationId, false);
+    operationHandle = new TSOperationHandle(queryId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
   }
@@ -1021,10 +1027,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     TSExecuteStatementResp resp = new TSExecuteStatementResp();
     TSStatus tsStatus = new TSStatus(status);
     resp.setStatus(tsStatus);
-    TSHandleIdentifier operationId = new TSHandleIdentifier(
+    TSHandleIdentifier queryId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()), generateOperationId());
-    TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
+        ByteBuffer.wrap("PASS".getBytes()), generateQueryId());
+    TSOperationHandle operationHandle = new TSOperationHandle(queryId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
   }
@@ -1369,9 +1375,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateOperationId() {
-    long queryId = operationIdGenerator.get();
-    operationIdGenerator.set(queryId + 1);
+  private long generateQueryId() {
+    long queryId = queryIdGenerator.get();
+    queryIdGenerator.set(queryId + 1);
     return queryId;
   }
 }
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 4e722f8..53e38c1 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -84,4 +84,26 @@ Last Updated on October 27th, 2019 by Lei Rui.
 | Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
 
 
+# 0.10.0 (version-1) -> version-2
 
+Last Updated on November 12th, 2019 by Tian Jiang.
+
+
+## 1. Delete Old
+
+| Latest Changes                     | Related Committers |
+| ---------------------------------- | ------------------ |
+| Remove TS_SessionHandle,TSHandleIdentifier            | Tian Jiang         |
+
+## 2. Add New
+
+| Latest Changes                                               | Related Committers                 |
+| ------------------------------------------------------------ | ---------------------------------- |
+|                            |                       |
+
+## 3. Update
+
+| Latest Changes                                               | Related Committers     |
+| ------------------------------------------------------------ | ---------------------- |
+| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds  | Tian 
+Jiang  |
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index d7a123d..5c4fe5d 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -31,37 +31,9 @@ struct TSStatus {
   3: optional string sqlState  // as defined in the ISO/IEF CLIENT specification
 }
 
-struct TSHandleIdentifier {
-  // 16 byte globally unique identifier This is the public ID of the handle and can be used for reporting.
-  // In current version, it is not used.
-  1: required binary guid,
-
-  // 16 byte secret generated by the server and used to verify that the handle is not being hijacked by another user.
-  // In current version, it is not used.
-  2: required binary secret,
-
-  // unique identifier in session. This is a ID to identify a query in one session.
-  3: required i64 queryId,
-}
-
-// Client-side reference to a task running asynchronously on the server.
-struct TSOperationHandle {
-  1: required TSHandleIdentifier operationId
-
-  // If hasResultSet = TRUE, then this operation
-  // generates a result set that can be fetched.
-  // Note that the result set may be empty.
-  //
-  // If hasResultSet = FALSE, then this operation
-  // does not generate a result set, and calling
-  // GetResultSetMetadata or FetchResults against
-  // this OperationHandle will generate an error.
-  2: required bool hasResultSet
-}
-
 struct TSExecuteStatementResp {
 	1: required TSStatus status
-	2: optional TSOperationHandle operationHandle
+	2: optional i64 queryId
   // Column names in select statement of SQL
 	3: optional list<string> columns
 	4: optional string operationType
@@ -74,21 +46,14 @@ enum TSProtocolVersion {
   IOTDB_SERVICE_PROTOCOL_V1,
 }
 
-// Client-side handle to persistent session information on the server-side.
-// In current version, it is not used.
-struct TS_SessionHandle {
-  1: required TSHandleIdentifier sessionId
-}
-
-
 struct TSOpenSessionResp {
   1: required TSStatus status
 
   // The protocol version that the server is using.
   2: required TSProtocolVersion serverProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1
 
-  // Session Handle
-  3: optional TS_SessionHandle sessionHandle
+  // Session id
+  3: optional i64 sessionId
 
   // The configuration settings for this session.
   4: optional map<string, string> configuration
@@ -103,11 +68,15 @@ struct TSOpenSessionReq {
   4: optional map<string, string> configuration
 }
 
+struct TSAuthenticatedReq {
+  1: required i64 sessionId
+}
+
 // CloseSession()
 // Closes the specified session and frees any resources currently allocated to that session.
 // Any open operations in that session will be canceled.
 struct TSCloseSessionReq {
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 }
 
 // ExecuteStatement()
@@ -117,7 +86,7 @@ struct TSCloseSessionReq {
 // statement has finished executing.
 struct TSExecuteStatementReq {
   // The session to execute the statement against
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 
   // The statement to be executed (DML, DDL, SET, etc)
   2: required string statement
@@ -127,57 +96,51 @@ struct TSExecuteStatementReq {
 }
 
 struct TSExecuteInsertRowInBatchResp{
-	1: required list<TSStatus> statusList
+  1: required i64 sessionId
+	2: required list<TSStatus> statusList
 }
 
 struct TSExecuteBatchStatementResp{
-	1: required TSStatus status
+  1: required i64 sessionId
+	2: required TSStatus status
   // For each value in result, Statement.SUCCESS_NO_INFO represents success, Statement.EXECUTE_FAILED represents fail otherwise.
-	2: optional list<i32> result
+	3: optional list<i32> result
 }
 
 struct TSExecuteBatchStatementReq{
   // The session to execute the statement against
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 
   // The statements to be executed (DML, DDL, SET, etc)
   2: required list<string> statements
 }
 
-
 struct TSGetOperationStatusReq {
+  1: required i64 sessionId
   // Session to run this request against
-  1: required TSOperationHandle operationHandle
+  2: required i64 queryId
 }
 
 // CancelOperation()
 //
 // Cancels processing on the specified operation handle and frees any resources which were allocated.
 struct TSCancelOperationReq {
+  1: required i64 sessionId
   // Operation to cancel
-  1: required TSOperationHandle operationHandle
+  2: required i64 queryId
 }
 
 // CloseOperation()
 struct TSCloseOperationReq {
-  1: required TSOperationHandle operationHandle
+  1: required i64 sessionId
   2: required i64 queryId
-  3: optional i64 stmtId
-}
-
-struct TSQueryDataSet{
-   // ByteBuffer for time column
-   1: required binary time
-   // ByteBuffer for each column values
-   2: required list<binary> valueList
-   // Bitmap for each column to indicate whether it is a null value
-   3: required list<binary> bitmapList
 }
 
 struct TSFetchResultsReq{
-	1: required string statement
-	2: required i32 fetchSize
-	3: required i64 queryId
+  1: required i64 sessionId
+	2: required string statement
+	3: required i32 fetchSize
+	4: required i64 queryId
 }
 
 struct TSFetchResultsResp{
@@ -201,16 +164,10 @@ struct TSFetchMetadataResp{
 }
 
 struct TSFetchMetadataReq{
-		1: required string type
-		2: optional string columnPath
-		3: optional i32 nodeLevel
-}
-
-struct TSColumnSchema{
-	1: optional string name;
-	2: optional string dataType;
-	3: optional string encoding;
-	4: optional map<string, string> otherArgs;
+    1: required i64 sessionId
+		2: required string type
+		3: optional string columnPath
+		4: optional i32 nodeLevel
 }
 
 struct TSGetTimeZoneResp {
@@ -219,52 +176,59 @@ struct TSGetTimeZoneResp {
 }
 
 struct TSSetTimeZoneReq {
-    1: required string timeZone
+    1: required i64 sessionId
+    2: required string timeZone
 }
 
 // for prepared statement
 struct TSInsertionReq {
-    1: optional string deviceId
-    2: optional list<string> measurements
-    3: optional list<string> values
-    4: optional i64 timestamp
-    5: required i64 stmtId
+    1: required i64 sessionId
+    2: optional string deviceId
+    3: optional list<string> measurements
+    4: optional list<string> values
+    5: optional i64 timestamp
+    6: required i64 stmtId
 }
 
 // for session
 struct TSInsertReq {
-    1: required string deviceId
-    2: required list<string> measurements
-    3: required list<string> values
-    4: required i64 timestamp
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<string> measurements
+    4: required list<string> values
+    5: required i64 timestamp
 }
 
 struct TSBatchInsertionReq {
-    1: required string deviceId
-    2: required list<string> measurements
-    3: required binary values
-    4: required binary timestamps
-    5: required list<i32> types
-    6: required i32 size
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<string> measurements
+    4: required binary values
+    5: required binary timestamps
+    6: required list<i32> types
+    7: required i32 size
 }
 
 struct TSInsertInBatchReq {
-    1: required list<string> deviceIds
-    2: required list<list<string>> measurementsList
-    3: required list<list<string>> valuesList
-    4: required list<i64> timestamps
+    1: required i64 sessionId
+    2: required list<string> deviceIds
+    3: required list<list<string>> measurementsList
+    4: required list<list<string>> valuesList
+    5: required list<i64> timestamps
 }
 
 struct TSDeleteDataReq {
-    1: required list<string> paths
-    2: required i64 timestamp
+    1: required i64 sessionId
+    2: required list<string> paths
+    3: required i64 timestamp
 }
 
 struct TSCreateTimeseriesReq {
-  1: required string path
-  2: required i32 dataType
-  3: required i32 encoding
-  4: required i32 compressor
+  1: required i64 sessionId
+  2: required string path
+  3: required i32 dataType
+  4: required i32 encoding
+  5: required i32 compressor
 }
 
 struct ServerProperties {
@@ -273,6 +237,22 @@ struct ServerProperties {
 	3: required string timestampPrecision;
 }
 
+struct TSQueryDataSet{
+   // ByteBuffer for time column
+   1: required binary time
+   // ByteBuffer for each column values
+   2: required list<binary> valueList
+   // Bitmap for each column to indicate whether it is a null value
+   3: required list<binary> bitmapList
+}
+
+struct TSColumnSchema{
+	1: optional string name;
+	2: optional string dataType;
+	3: optional string encoding;
+	4: optional map<string, string> otherArgs;
+}
+
 service TSIService {
 	TSOpenSessionResp openSession(1:TSOpenSessionReq req);