You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/12 03:15:04 UTC

[incubator-iotdb] branch refactor_session_management created (now 7480e60)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch refactor_session_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 7480e60  update thrift file

This branch includes the following new commits:

     new 7480e60  update thrift file

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: update thrift file

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_session_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7480e606d5764f671dbc2ae0ea44b94a2a6d95dd
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Dec 12 11:14:43 2019 +0800

    update thrift file
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  60 +++----
 service-rpc/rpc-changelist.md                      |  22 +++
 service-rpc/src/main/thrift/rpc.thrift             | 174 +++++++++------------
 3 files changed, 132 insertions(+), 124 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4eeb357..b18e38c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.service;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -90,22 +92,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   protected QueryProcessor processor;
   // Record the username for every rpc connection. Username.get() is null if
   // login is failed.
-  protected ThreadLocal<String> username = new ThreadLocal<>();
-
-  // The statementId is unique in one session for each statement.
-  private ThreadLocal<Long> statementIdGenerator = new ThreadLocal<>();
-  // The operationIdGenerator is unique in one session for each operation.
-  private ThreadLocal<Long> operationIdGenerator = new ThreadLocal<>();
+  protected Map<Long, String> usernameMap = new ConcurrentHashMap<>();
+  private Map<Long, ZoneId> zoneIds = new ConcurrentHashMap<>();
+  
+  // The sessionId is unique in one IoTDB instance.
+  private AtomicLong sessionIdGenerator = new AtomicLong();
+  // The statementId is unique in one IoTDB instance.
+  private AtomicLong statementIdGenerator = new AtomicLong();
+  // The queryIdGenerator is unique in one IoTDB instance for each operation that needs an id.
+  private AtomicLong queryIdGenerator = new AtomicLong();
   // (statement -> Set(queryId))
-  private ThreadLocal<Map<Long, Set<Long>>> statementId2QueryId = new ThreadLocal<>();
+  private Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
   // (queryId -> PhysicalPlan)
-  private ThreadLocal<Map<Long, PhysicalPlan>> queryId2Plan = new ThreadLocal<>();
+  private Map<Long, PhysicalPlan> queryId2Plan = new ConcurrentHashMap<>();
   // (queryId -> QueryDataSet)
-  private ThreadLocal<Map<Long, QueryDataSet>> queryId2DataSet = new ThreadLocal<>();
-  private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
+  private Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+  // (queryId -> QueryContext)
+  private Map<Long, QueryContext> contextMapLocal = new ConcurrentHashMap<>();
+  
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private ThreadLocal<Map<Long, QueryContext>> contextMapLocal = new ThreadLocal<>();
-
+  
   public TSServiceImpl() {
     processor = new QueryProcessor(new QueryProcessExecutor());
   }
@@ -191,7 +197,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private void initForOneSession() {
     queryId2Plan.set(new HashMap<>());
     queryId2DataSet.set(new HashMap<>());
-    operationIdGenerator.set(0L);
+    queryIdGenerator.set(0L);
     statementIdGenerator.set(0L);
     contextMapLocal.set(new HashMap<>());
     statementId2QueryId.set(new HashMap<>());
@@ -215,8 +221,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       statementIdGenerator.remove();
     }
     // clear the queryId counter
-    if (operationIdGenerator.get() != null) {
-      operationIdGenerator.remove();
+    if (queryIdGenerator.get() != null) {
+      queryIdGenerator.remove();
     }
     // clear all cached physical plans of the connection
     if (queryId2Plan.get() != null) {
@@ -608,16 +614,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       } // else default ignoreTimeStamp is false
       resp.setOperationType(plan.getOperatorType().toString());
       // generate the queryId for the operation
-      long queryId = generateOperationId();
+      long queryId = generateQueryId();
       // put it into the corresponding Set
       Set<Long> queryIdSet = statementId2QueryId.get()
           .computeIfAbsent(statementId, k -> new HashSet<>());
       queryIdSet.add(queryId);
 
-      TSHandleIdentifier operationId = new TSHandleIdentifier(
+      TSHandleIdentifier queryId = new TSHandleIdentifier(
           ByteBuffer.wrap(username.get().getBytes()), ByteBuffer.wrap("PASS".getBytes()),
           queryId);
-      TSOperationHandle operationHandle = new TSOperationHandle(operationId, true);
+      TSOperationHandle operationHandle = new TSOperationHandle(queryId, true);
       resp.setOperationHandle(operationHandle);
 
       queryId2Plan.get().put(queryId, plan);
@@ -964,12 +970,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     status = executePlan(plan);
     TSExecuteStatementResp resp = getTSExecuteStatementResp(status);
-    long queryId = generateOperationId();
-    TSHandleIdentifier operationId = new TSHandleIdentifier(
+    long queryId = generateQueryId();
+    TSHandleIdentifier queryId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
         ByteBuffer.wrap("PASS".getBytes()), queryId);
     TSOperationHandle operationHandle;
-    operationHandle = new TSOperationHandle(operationId, false);
+    operationHandle = new TSOperationHandle(queryId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
   }
@@ -1021,10 +1027,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     TSExecuteStatementResp resp = new TSExecuteStatementResp();
     TSStatus tsStatus = new TSStatus(status);
     resp.setStatus(tsStatus);
-    TSHandleIdentifier operationId = new TSHandleIdentifier(
+    TSHandleIdentifier queryId = new TSHandleIdentifier(
         ByteBuffer.wrap(username.get().getBytes()),
-        ByteBuffer.wrap("PASS".getBytes()), generateOperationId());
-    TSOperationHandle operationHandle = new TSOperationHandle(operationId, false);
+        ByteBuffer.wrap("PASS".getBytes()), generateQueryId());
+    TSOperationHandle operationHandle = new TSOperationHandle(queryId, false);
     resp.setOperationHandle(operationHandle);
     return resp;
   }
@@ -1369,9 +1375,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateOperationId() {
-    long queryId = operationIdGenerator.get();
-    operationIdGenerator.set(queryId + 1);
+  private long generateQueryId() {
+    long queryId = queryIdGenerator.get();
+    queryIdGenerator.set(queryId + 1);
     return queryId;
   }
 }
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 4e722f8..53e38c1 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -84,4 +84,26 @@ Last Updated on October 27th, 2019 by Lei Rui.
 | Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
 
 
+# 0.10.0 (version-1) -> version-2
 
+Last Updated on November 12th, 2019 by Tian Jiang.
+
+
+## 1. Delete Old
+
+| Latest Changes                     | Related Committers |
+| ---------------------------------- | ------------------ |
+| Remove TS_SessionHandle,TSHandleIdentifier            | Tian Jiang         |
+
+## 2. Add New
+
+| Latest Changes                                               | Related Committers                 |
+| ------------------------------------------------------------ | ---------------------------------- |
+|                            |                       |
+
+## 3. Update
+
+| Latest Changes                                               | Related Committers     |
+| ------------------------------------------------------------ | ---------------------- |
+| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds  | Tian 
+Jiang  |
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index d7a123d..5c4fe5d 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -31,37 +31,9 @@ struct TSStatus {
   3: optional string sqlState  // as defined in the ISO/IEF CLIENT specification
 }
 
-struct TSHandleIdentifier {
-  // 16 byte globally unique identifier This is the public ID of the handle and can be used for reporting.
-  // In current version, it is not used.
-  1: required binary guid,
-
-  // 16 byte secret generated by the server and used to verify that the handle is not being hijacked by another user.
-  // In current version, it is not used.
-  2: required binary secret,
-
-  // unique identifier in session. This is a ID to identify a query in one session.
-  3: required i64 queryId,
-}
-
-// Client-side reference to a task running asynchronously on the server.
-struct TSOperationHandle {
-  1: required TSHandleIdentifier operationId
-
-  // If hasResultSet = TRUE, then this operation
-  // generates a result set that can be fetched.
-  // Note that the result set may be empty.
-  //
-  // If hasResultSet = FALSE, then this operation
-  // does not generate a result set, and calling
-  // GetResultSetMetadata or FetchResults against
-  // this OperationHandle will generate an error.
-  2: required bool hasResultSet
-}
-
 struct TSExecuteStatementResp {
 	1: required TSStatus status
-	2: optional TSOperationHandle operationHandle
+	2: optional i64 queryId
   // Column names in select statement of SQL
 	3: optional list<string> columns
 	4: optional string operationType
@@ -74,21 +46,14 @@ enum TSProtocolVersion {
   IOTDB_SERVICE_PROTOCOL_V1,
 }
 
-// Client-side handle to persistent session information on the server-side.
-// In current version, it is not used.
-struct TS_SessionHandle {
-  1: required TSHandleIdentifier sessionId
-}
-
-
 struct TSOpenSessionResp {
   1: required TSStatus status
 
   // The protocol version that the server is using.
   2: required TSProtocolVersion serverProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1
 
-  // Session Handle
-  3: optional TS_SessionHandle sessionHandle
+  // Session id
+  3: optional i64 sessionId
 
   // The configuration settings for this session.
   4: optional map<string, string> configuration
@@ -103,11 +68,15 @@ struct TSOpenSessionReq {
   4: optional map<string, string> configuration
 }
 
+struct TSAuthenticatedReq {
+  1: required i64 sessionId
+}
+
 // CloseSession()
 // Closes the specified session and frees any resources currently allocated to that session.
 // Any open operations in that session will be canceled.
 struct TSCloseSessionReq {
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 }
 
 // ExecuteStatement()
@@ -117,7 +86,7 @@ struct TSCloseSessionReq {
 // statement has finished executing.
 struct TSExecuteStatementReq {
   // The session to execute the statement against
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 
   // The statement to be executed (DML, DDL, SET, etc)
   2: required string statement
@@ -127,57 +96,51 @@ struct TSExecuteStatementReq {
 }
 
 struct TSExecuteInsertRowInBatchResp{
-	1: required list<TSStatus> statusList
+  1: required i64 sessionId
+	2: required list<TSStatus> statusList
 }
 
 struct TSExecuteBatchStatementResp{
-	1: required TSStatus status
+  1: required i64 sessionId
+	2: required TSStatus status
   // For each value in result, Statement.SUCCESS_NO_INFO represents success, Statement.EXECUTE_FAILED represents fail otherwise.
-	2: optional list<i32> result
+	3: optional list<i32> result
 }
 
 struct TSExecuteBatchStatementReq{
   // The session to execute the statement against
-  1: required TS_SessionHandle sessionHandle
+  1: required i64 sessionId
 
   // The statements to be executed (DML, DDL, SET, etc)
   2: required list<string> statements
 }
 
-
 struct TSGetOperationStatusReq {
+  1: required i64 sessionId
   // Session to run this request against
-  1: required TSOperationHandle operationHandle
+  2: required i64 queryId
 }
 
 // CancelOperation()
 //
 // Cancels processing on the specified operation handle and frees any resources which were allocated.
 struct TSCancelOperationReq {
+  1: required i64 sessionId
   // Operation to cancel
-  1: required TSOperationHandle operationHandle
+  2: required i64 queryId
 }
 
 // CloseOperation()
 struct TSCloseOperationReq {
-  1: required TSOperationHandle operationHandle
+  1: required i64 sessionId
   2: required i64 queryId
-  3: optional i64 stmtId
-}
-
-struct TSQueryDataSet{
-   // ByteBuffer for time column
-   1: required binary time
-   // ByteBuffer for each column values
-   2: required list<binary> valueList
-   // Bitmap for each column to indicate whether it is a null value
-   3: required list<binary> bitmapList
 }
 
 struct TSFetchResultsReq{
-	1: required string statement
-	2: required i32 fetchSize
-	3: required i64 queryId
+  1: required i64 sessionId
+	2: required string statement
+	3: required i32 fetchSize
+	4: required i64 queryId
 }
 
 struct TSFetchResultsResp{
@@ -201,16 +164,10 @@ struct TSFetchMetadataResp{
 }
 
 struct TSFetchMetadataReq{
-		1: required string type
-		2: optional string columnPath
-		3: optional i32 nodeLevel
-}
-
-struct TSColumnSchema{
-	1: optional string name;
-	2: optional string dataType;
-	3: optional string encoding;
-	4: optional map<string, string> otherArgs;
+    1: required i64 sessionId
+		2: required string type
+		3: optional string columnPath
+		4: optional i32 nodeLevel
 }
 
 struct TSGetTimeZoneResp {
@@ -219,52 +176,59 @@ struct TSGetTimeZoneResp {
 }
 
 struct TSSetTimeZoneReq {
-    1: required string timeZone
+    1: required i64 sessionId
+    2: required string timeZone
 }
 
 // for prepared statement
 struct TSInsertionReq {
-    1: optional string deviceId
-    2: optional list<string> measurements
-    3: optional list<string> values
-    4: optional i64 timestamp
-    5: required i64 stmtId
+    1: required i64 sessionId
+    2: optional string deviceId
+    3: optional list<string> measurements
+    4: optional list<string> values
+    5: optional i64 timestamp
+    6: required i64 stmtId
 }
 
 // for session
 struct TSInsertReq {
-    1: required string deviceId
-    2: required list<string> measurements
-    3: required list<string> values
-    4: required i64 timestamp
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<string> measurements
+    4: required list<string> values
+    5: required i64 timestamp
 }
 
 struct TSBatchInsertionReq {
-    1: required string deviceId
-    2: required list<string> measurements
-    3: required binary values
-    4: required binary timestamps
-    5: required list<i32> types
-    6: required i32 size
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<string> measurements
+    4: required binary values
+    5: required binary timestamps
+    6: required list<i32> types
+    7: required i32 size
 }
 
 struct TSInsertInBatchReq {
-    1: required list<string> deviceIds
-    2: required list<list<string>> measurementsList
-    3: required list<list<string>> valuesList
-    4: required list<i64> timestamps
+    1: required i64 sessionId
+    2: required list<string> deviceIds
+    3: required list<list<string>> measurementsList
+    4: required list<list<string>> valuesList
+    5: required list<i64> timestamps
 }
 
 struct TSDeleteDataReq {
-    1: required list<string> paths
-    2: required i64 timestamp
+    1: required i64 sessionId
+    2: required list<string> paths
+    3: required i64 timestamp
 }
 
 struct TSCreateTimeseriesReq {
-  1: required string path
-  2: required i32 dataType
-  3: required i32 encoding
-  4: required i32 compressor
+  1: required i64 sessionId
+  2: required string path
+  3: required i32 dataType
+  4: required i32 encoding
+  5: required i32 compressor
 }
 
 struct ServerProperties {
@@ -273,6 +237,22 @@ struct ServerProperties {
 	3: required string timestampPrecision;
 }
 
+struct TSQueryDataSet{
+   // ByteBuffer for time column
+   1: required binary time
+   // ByteBuffer for each column values
+   2: required list<binary> valueList
+   // Bitmap for each column to indicate whether it is a null value
+   3: required list<binary> bitmapList
+}
+
+struct TSColumnSchema{
+	1: optional string name;
+	2: optional string dataType;
+	3: optional string encoding;
+	4: optional map<string, string> otherArgs;
+}
+
 service TSIService {
 	TSOpenSessionResp openSession(1:TSOpenSessionReq req);