You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/26 13:42:35 UTC
[iotdb] branch master updated: [WIP] Extract out SessionManager
From TSServiceImpl (#3454)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new affe6d9 [WIP] Extract out SessionManager From TSServiceImpl (#3454)
affe6d9 is described below
commit affe6d95ec3ddaf0689e58d0283945ac29d54830
Author: J.J. Liu <li...@gmail.com>
AuthorDate: Sat Jun 26 21:42:09 2021 +0800
[WIP] Extract out SessionManager From TSServiceImpl (#3454)
---
.../iotdb/db/query/control/SessionManager.java | 178 +++++++++++++++++++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 139 ++++------------
2 files changed, 213 insertions(+), 104 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
new file mode 100644
index 0000000..a2a253f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SessionManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+ // Record the username for every rpc connection (session).
+ private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
+ private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+
+ // The sessionId is unique in one IoTDB instance.
+ private final AtomicLong sessionIdGenerator = new AtomicLong();
+ // The statementId is unique in one IoTDB instance.
+ private final AtomicLong statementIdGenerator = new AtomicLong();
+
+ // (sessionId -> Set(statementId))
+ private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+ // (statementId -> Set(queryId))
+ private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
+ // (queryId -> QueryDataSet)
+ private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
+
+ private SessionManager() {
+ // singleton
+ }
+
+ public long requestSessionId(String username, String zoneId) {
+ long sessionId = sessionIdGenerator.incrementAndGet();
+ sessionIdToUsername.put(sessionId, username);
+ sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
+
+ return sessionId;
+ }
+
+ public boolean releaseSessionResource(long sessionId) {
+ sessionIdToZoneId.remove(sessionId);
+
+ for (long statementId :
+ sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
+ for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
+ releaseQueryResourceNoExceptions(queryId);
+ }
+ }
+
+ return sessionIdToUsername.remove(sessionId) != null;
+ }
+
+ public long requestStatementId(long sessionId) {
+ long statementId = statementIdGenerator.incrementAndGet();
+ sessionIdToStatementId
+ .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+ .add(statementId);
+ return statementId;
+ }
+
+ public void closeStatement(long sessionId, long statementId) {
+ Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+ if (queryIdSet != null) {
+ for (long queryId : queryIdSet) {
+ releaseQueryResourceNoExceptions(queryId);
+ }
+ }
+
+ if (sessionIdToStatementId.containsKey(sessionId)) {
+ sessionIdToStatementId.get(sessionId).remove(statementId);
+ }
+ }
+
+ public long requestQueryId(
+ Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+ long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ statementIdToQueryId
+ .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+ .add(queryId);
+ return queryId;
+ }
+
+ public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+ return QueryResourceManager.getInstance()
+ .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ }
+
+ public void releaseQueryResource(long queryId) throws StorageEngineException {
+ QueryDataSet dataSet = queryIdToDataSet.remove(queryId);
+ if (dataSet instanceof UDTFDataSet) {
+ ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
+ }
+ QueryResourceManager.getInstance().endQuery(queryId);
+ }
+
+ public void releaseQueryResourceNoExceptions(long queryId) {
+ if (queryId != -1) {
+ try {
+ releaseQueryResource(queryId);
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred while releasing query resource: ", e);
+ }
+ }
+ }
+
+ public String getUsername(Long sessionId) {
+ return sessionIdToUsername.get(sessionId);
+ }
+
+ public ZoneId getZoneId(Long sessionId) {
+ return sessionIdToZoneId.get(sessionId);
+ }
+
+ public void setTimezone(Long sessionId, String zone) {
+ sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+ }
+
+ public boolean hasDataset(Long queryId) {
+ return queryIdToDataSet.containsKey(queryId);
+ }
+
+ public QueryDataSet getDataset(Long queryId) {
+ return queryIdToDataSet.get(queryId);
+ }
+
+ public void setDataset(Long queryId, QueryDataSet dataSet) {
+ queryIdToDataSet.put(queryId, dataSet);
+ }
+
+ public void removeDataset(Long queryId) {
+ queryIdToDataSet.remove(queryId);
+ }
+
+ public void closeDataset(Long statementId, Long queryId) {
+ releaseQueryResourceNoExceptions(queryId);
+ if (statementIdToQueryId.containsKey(statementId)) {
+ statementIdToQueryId.get(statementId).remove(queryId);
+ }
+ }
+
+ public static SessionManager getInstance() {
+ return SessionManagerHelper.INSTANCE;
+ }
+
+ private static class SessionManagerHelper {
+
+ private static final SessionManager INSTANCE = new SessionManager();
+
+ private SessionManagerHelper() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6a2c8e8..f36ee5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,13 +73,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
@@ -145,19 +144,15 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
-import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** Thrift RPC implementation at server side. */
@@ -195,21 +190,7 @@ public class TSServiceImpl implements TSIService.Iface {
protected Planner processor;
protected IPlanExecutor executor;
- // Record the username for every rpc connection (session).
- private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
- private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
-
- // The sessionId is unique in one IoTDB instance.
- private final AtomicLong sessionIdGenerator = new AtomicLong();
- // The statementId is unique in one IoTDB instance.
- private final AtomicLong statementIdGenerator = new AtomicLong();
-
- // (sessionId -> Set(statementId))
- private final Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
- // (statementId -> Set(queryId))
- private final Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
- // (queryId -> QueryDataSet)
- private final Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+ private SessionManager sessionManager = SessionManager.getInstance();
// When the client abnormally exits, we can still know who to disconnect
private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
@@ -277,9 +258,8 @@ public class TSServiceImpl implements TSIService.Iface {
}
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
- sessionId = sessionIdGenerator.incrementAndGet();
- sessionIdUsernameMap.put(sessionId, req.getUsername());
- sessionIdZoneIdMap.put(sessionId, ZoneId.of(req.getZoneId()));
+
+ sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
currSessionId.set(sessionId);
AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
LOGGER.info(
@@ -310,16 +290,9 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.info("Session-{} is closing", sessionId);
currSessionId.remove();
- sessionIdZoneIdMap.remove(sessionId);
-
- for (long statementId : sessionId2StatementId.getOrDefault(sessionId, Collections.emptySet())) {
- for (long queryId : statementId2QueryId.getOrDefault(statementId, Collections.emptySet())) {
- releaseQueryResourceNoExceptions(queryId);
- }
- }
return new TSStatus(
- sessionIdUsernameMap.remove(sessionId) == null
+ !sessionManager.releaseSessionResource(sessionId)
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -344,25 +317,10 @@ public class TSServiceImpl implements TSIService.Iface {
}
try {
- // ResultSet close
if (req.isSetStatementId() && req.isSetQueryId()) {
- releaseQueryResourceNoExceptions(req.queryId);
- // clear the statementId2QueryId map
- if (statementId2QueryId.containsKey(req.getStatementId())) {
- statementId2QueryId.get(req.getStatementId()).remove(req.getQueryId());
- }
+ sessionManager.closeDataset(req.statementId, req.queryId);
} else {
- // statement close
- Set<Long> queryIdSet = statementId2QueryId.remove(req.getStatementId());
- if (queryIdSet != null) {
- for (long queryId : queryIdSet) {
- releaseQueryResourceNoExceptions(queryId);
- }
- }
- // clear the sessionId2StatementId map
- if (sessionId2StatementId.containsKey(req.getSessionId())) {
- sessionId2StatementId.get(req.getSessionId()).remove(req.getStatementId());
- }
+ sessionManager.closeStatement(req.sessionId, req.statementId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
@@ -373,22 +331,7 @@ public class TSServiceImpl implements TSIService.Iface {
/** release single operation resource */
protected void releaseQueryResource(long queryId) throws StorageEngineException {
- // remove the corresponding Physical Plan
- QueryDataSet dataSet = queryId2DataSet.remove(queryId);
- if (dataSet instanceof UDTFDataSet) {
- ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
- }
- QueryResourceManager.getInstance().endQuery(queryId);
- }
-
- private void releaseQueryResourceNoExceptions(long queryId) {
- if (queryId != -1) {
- try {
- releaseQueryResource(queryId);
- } catch (Exception e) {
- LOGGER.warn("Error occurred while releasing query resource: ", e);
- }
- }
+ sessionManager.releaseQueryResource(queryId);
}
@Override
@@ -555,7 +498,7 @@ public class TSServiceImpl implements TSIService.Iface {
try {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+ statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
if (physicalPlan.isQuery()) {
throw new QueryInBatchStatementException(statement);
}
@@ -653,7 +596,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+ statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -662,7 +605,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.getSessionId()),
req.isEnableRedirectQuery())
: executeUpdateStatement(physicalPlan, req.getSessionId());
} catch (InterruptedException e) {
@@ -684,7 +627,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+ statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -693,7 +636,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.getSessionId()),
req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -716,7 +659,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
PhysicalPlan physicalPlan =
- processor.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+ processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
"",
@@ -724,7 +667,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
config.getQueryTimeoutThreshold(),
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.sessionId),
req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -747,7 +690,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
PhysicalPlan physicalPlan =
- processor.lastDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+ processor.lastDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
"",
@@ -755,7 +698,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
config.getQueryTimeoutThreshold(),
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.sessionId),
req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -797,7 +740,7 @@ public class TSServiceImpl implements TSIService.Iface {
fetchSize = p.left;
// generate the queryId for the operation
- queryId = generateQueryId(true, fetchSize, p.right);
+ queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
// register query info to queryTimeManager
if (!(plan instanceof ShowQueryProcesslistPlan)) {
queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -811,10 +754,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- statementId2QueryId
- .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
- .add(queryId);
-
if (plan instanceof AuthorPlan) {
plan.setLoginUserName(username);
}
@@ -905,7 +844,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
return resp;
} catch (Exception e) {
- releaseQueryResourceNoExceptions(queryId);
+ sessionManager.releaseQueryResourceNoExceptions(queryId);
throw e;
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1109,7 +1048,7 @@ public class TSServiceImpl implements TSIService.Iface {
return RpcUtils.getTSFetchResultsResp(TSStatusCode.NOT_LOGIN_ERROR);
}
- if (!queryId2DataSet.containsKey(req.queryId)) {
+ if (!sessionManager.hasDataset(req.queryId)) {
return RpcUtils.getTSFetchResultsResp(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
}
@@ -1118,13 +1057,14 @@ public class TSServiceImpl implements TSIService.Iface {
queryTimeManager.registerQuery(
req.queryId, System.currentTimeMillis(), req.statement, req.timeout);
- QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
+ QueryDataSet queryDataSet = sessionManager.getDataset(req.queryId);
if (req.isAlign) {
TSQueryDataSet result =
- fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+ fillRpcReturnData(
+ req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
- releaseQueryResourceNoExceptions(req.queryId);
+ sessionManager.releaseQueryResourceNoExceptions(req.queryId);
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
resp.setHasResultSet(hasResultSet);
@@ -1136,7 +1076,7 @@ public class TSServiceImpl implements TSIService.Iface {
} else {
TSQueryNonAlignDataSet nonAlignResult =
fillRpcNonAlignReturnData(
- req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+ req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
boolean hasResultSet = false;
for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
if (timeBuffer.limit() != 0) {
@@ -1145,7 +1085,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
if (!hasResultSet) {
- queryId2DataSet.remove(req.queryId);
+ sessionManager.removeDataset(req.queryId);
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
resp.setHasResultSet(hasResultSet);
@@ -1162,7 +1102,7 @@ public class TSServiceImpl implements TSIService.Iface {
onNPEOrUnexpectedException(
e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
} catch (Exception e) {
- releaseQueryResourceNoExceptions(req.queryId);
+ sessionManager.releaseQueryResourceNoExceptions(req.queryId);
return RpcUtils.getTSFetchResultsResp(
onNPEOrUnexpectedException(
e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
@@ -1214,7 +1154,7 @@ public class TSServiceImpl implements TSIService.Iface {
QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
queryDataSet.setFetchSize(fetchSize);
- queryId2DataSet.put(queryId, queryDataSet);
+ sessionManager.setDataset(queryId, queryDataSet);
return queryDataSet;
}
@@ -1243,7 +1183,7 @@ public class TSServiceImpl implements TSIService.Iface {
status = executeNonQueryPlan(plan);
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
- long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
+ long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
return resp.setQueryId(queryId);
}
@@ -1260,7 +1200,7 @@ public class TSServiceImpl implements TSIService.Iface {
throws QueryProcessException {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+ statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1273,7 +1213,7 @@ public class TSServiceImpl implements TSIService.Iface {
* @return true: If logged in; false: If not logged in
*/
private boolean checkLogin(long sessionId) {
- boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
+ boolean isLoggedIn = sessionManager.getZoneId(sessionId) != null;
if (!isLoggedIn) {
LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
}
@@ -1300,7 +1240,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = sessionIdZoneIdMap.get(sessionId);
+ ZoneId zoneId = sessionManager.getZoneId(sessionId);
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1315,7 +1255,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- sessionIdZoneIdMap.put(req.getSessionId(), ZoneId.of(req.getTimeZone()));
+ sessionManager.setTimezone(req.sessionId, req.timeZone);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(e, "setting time zone", TSStatusCode.SET_TIME_ZONE_ERROR);
@@ -1947,11 +1887,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public long requestStatementId(long sessionId) {
- long statementId = statementIdGenerator.incrementAndGet();
- sessionId2StatementId
- .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
- .add(statementId);
- return statementId;
+ return sessionManager.requestStatementId(sessionId);
}
@Override
@@ -2036,7 +1972,7 @@ public class TSServiceImpl implements TSIService.Iface {
private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
List<PartialPath> paths = plan.getPaths();
try {
- if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
+ if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
return RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation " + plan.getOperatorType());
@@ -2065,11 +2001,6 @@ public class TSServiceImpl implements TSIService.Iface {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
- private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- return QueryResourceManager.getInstance()
- .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
- }
-
protected List<TSDataType> getSeriesTypesByPaths(
List<PartialPath> paths, List<String> aggregations) throws MetadataException {
return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);