You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/26 13:42:35 UTC

[iotdb] branch master updated: [WIP] Extract out SessionManager From TSServiceImpl (#3454)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new affe6d9  [WIP] Extract out SessionManager From TSServiceImpl (#3454)
affe6d9 is described below

commit affe6d95ec3ddaf0689e58d0283945ac29d54830
Author: J.J. Liu <li...@gmail.com>
AuthorDate: Sat Jun 26 21:42:09 2021 +0800

    [WIP] Extract out SessionManager From TSServiceImpl (#3454)
---
 .../iotdb/db/query/control/SessionManager.java     | 178 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 139 ++++------------
 2 files changed, 213 insertions(+), 104 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
new file mode 100644
index 0000000..a2a253f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SessionManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+  // Record the username for every rpc connection (session).
+  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
+  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+
+  // The sessionId is unique in one IoTDB instance.
+  private final AtomicLong sessionIdGenerator = new AtomicLong();
+  // The statementId is unique in one IoTDB instance.
+  private final AtomicLong statementIdGenerator = new AtomicLong();
+
+  // (sessionId -> Set(statementId))
+  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+  // (statementId -> Set(queryId))
+  private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
+  // (queryId -> QueryDataSet)
+  private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
+
+  private SessionManager() {
+    // singleton
+  }
+
+  public long requestSessionId(String username, String zoneId) {
+    long sessionId = sessionIdGenerator.incrementAndGet();
+    sessionIdToUsername.put(sessionId, username);
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
+
+    return sessionId;
+  }
+
+  public boolean releaseSessionResource(long sessionId) {
+    sessionIdToZoneId.remove(sessionId);
+
+    for (long statementId :
+        sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
+      for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    return sessionIdToUsername.remove(sessionId) != null;
+  }
+
+  public long requestStatementId(long sessionId) {
+    long statementId = statementIdGenerator.incrementAndGet();
+    sessionIdToStatementId
+        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+        .add(statementId);
+    return statementId;
+  }
+
+  public void closeStatement(long sessionId, long statementId) {
+    Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+    if (queryIdSet != null) {
+      for (long queryId : queryIdSet) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    if (sessionIdToStatementId.containsKey(sessionId)) {
+      sessionIdToStatementId.get(sessionId).remove(statementId);
+    }
+  }
+
+  public long requestQueryId(
+      Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+    statementIdToQueryId
+        .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+        .add(queryId);
+    return queryId;
+  }
+
+  public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    return QueryResourceManager.getInstance()
+        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+  }
+
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
+    QueryDataSet dataSet = queryIdToDataSet.remove(queryId);
+    if (dataSet instanceof UDTFDataSet) {
+      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
+    }
+    QueryResourceManager.getInstance().endQuery(queryId);
+  }
+
+  public void releaseQueryResourceNoExceptions(long queryId) {
+    if (queryId != -1) {
+      try {
+        releaseQueryResource(queryId);
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred while releasing query resource: ", e);
+      }
+    }
+  }
+
+  public String getUsername(Long sessionId) {
+    return sessionIdToUsername.get(sessionId);
+  }
+
+  public ZoneId getZoneId(Long sessionId) {
+    return sessionIdToZoneId.get(sessionId);
+  }
+
+  public void setTimezone(Long sessionId, String zone) {
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+  }
+
+  public boolean hasDataset(Long queryId) {
+    return queryIdToDataSet.containsKey(queryId);
+  }
+
+  public QueryDataSet getDataset(Long queryId) {
+    return queryIdToDataSet.get(queryId);
+  }
+
+  public void setDataset(Long queryId, QueryDataSet dataSet) {
+    queryIdToDataSet.put(queryId, dataSet);
+  }
+
+  public void removeDataset(Long queryId) {
+    queryIdToDataSet.remove(queryId);
+  }
+
+  public void closeDataset(Long statementId, Long queryId) {
+    releaseQueryResourceNoExceptions(queryId);
+    if (statementIdToQueryId.containsKey(statementId)) {
+      statementIdToQueryId.get(statementId).remove(queryId);
+    }
+  }
+
+  public static SessionManager getInstance() {
+    return SessionManagerHelper.INSTANCE;
+  }
+
+  private static class SessionManagerHelper {
+
+    private static final SessionManager INSTANCE = new SessionManager();
+
+    private SessionManagerHelper() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6a2c8e8..f36ee5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,13 +73,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
@@ -145,19 +144,15 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /** Thrift RPC implementation at server side. */
@@ -195,21 +190,7 @@ public class TSServiceImpl implements TSIService.Iface {
   protected Planner processor;
   protected IPlanExecutor executor;
 
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
-
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
-  // The statementId is unique in one IoTDB instance.
-  private final AtomicLong statementIdGenerator = new AtomicLong();
-
-  // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
-  // (statementId -> Set(queryId))
-  private final Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
-  // (queryId -> QueryDataSet)
-  private final Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+  private SessionManager sessionManager = SessionManager.getInstance();
 
   // When the client abnormally exits, we can still know who to disconnect
   private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
@@ -277,9 +258,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
-      sessionId = sessionIdGenerator.incrementAndGet();
-      sessionIdUsernameMap.put(sessionId, req.getUsername());
-      sessionIdZoneIdMap.put(sessionId, ZoneId.of(req.getZoneId()));
+
+      sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
       currSessionId.set(sessionId);
       AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
       LOGGER.info(
@@ -310,16 +290,9 @@ public class TSServiceImpl implements TSIService.Iface {
     AUDIT_LOGGER.info("Session-{} is closing", sessionId);
 
     currSessionId.remove();
-    sessionIdZoneIdMap.remove(sessionId);
-
-    for (long statementId : sessionId2StatementId.getOrDefault(sessionId, Collections.emptySet())) {
-      for (long queryId : statementId2QueryId.getOrDefault(statementId, Collections.emptySet())) {
-        releaseQueryResourceNoExceptions(queryId);
-      }
-    }
 
     return new TSStatus(
-        sessionIdUsernameMap.remove(sessionId) == null
+        !sessionManager.releaseSessionResource(sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -344,25 +317,10 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     try {
-      // ResultSet close
       if (req.isSetStatementId() && req.isSetQueryId()) {
-        releaseQueryResourceNoExceptions(req.queryId);
-        // clear the statementId2QueryId map
-        if (statementId2QueryId.containsKey(req.getStatementId())) {
-          statementId2QueryId.get(req.getStatementId()).remove(req.getQueryId());
-        }
+        sessionManager.closeDataset(req.statementId, req.queryId);
       } else {
-        // statement close
-        Set<Long> queryIdSet = statementId2QueryId.remove(req.getStatementId());
-        if (queryIdSet != null) {
-          for (long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
-          }
-        }
-        // clear the sessionId2StatementId map
-        if (sessionId2StatementId.containsKey(req.getSessionId())) {
-          sessionId2StatementId.get(req.getSessionId()).remove(req.getStatementId());
-        }
+        sessionManager.closeStatement(req.sessionId, req.statementId);
       }
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
@@ -373,22 +331,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   /** release single operation resource */
   protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // remove the corresponding Physical Plan
-    QueryDataSet dataSet = queryId2DataSet.remove(queryId);
-    if (dataSet instanceof UDTFDataSet) {
-      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
-    }
-    QueryResourceManager.getInstance().endQuery(queryId);
-  }
-
-  private void releaseQueryResourceNoExceptions(long queryId) {
-    if (queryId != -1) {
-      try {
-        releaseQueryResource(queryId);
-      } catch (Exception e) {
-        LOGGER.warn("Error occurred while releasing query resource: ", e);
-      }
-    }
+    sessionManager.releaseQueryResource(queryId);
   }
 
   @Override
@@ -555,7 +498,7 @@ public class TSServiceImpl implements TSIService.Iface {
       try {
         PhysicalPlan physicalPlan =
             processor.parseSQLToPhysicalPlan(
-                statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+                statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
         if (physicalPlan.isQuery()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -653,7 +596,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -662,7 +605,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : executeUpdateStatement(physicalPlan, req.getSessionId());
     } catch (InterruptedException e) {
@@ -684,7 +627,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -693,7 +636,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -716,7 +659,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       PhysicalPlan physicalPlan =
-          processor.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+          processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
               "",
@@ -724,7 +667,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.sessionId),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -747,7 +690,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       PhysicalPlan physicalPlan =
-          processor.lastDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+          processor.lastDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
               "",
@@ -755,7 +698,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.sessionId),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -797,7 +740,7 @@ public class TSServiceImpl implements TSIService.Iface {
       fetchSize = p.left;
 
       // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, p.right);
+      queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
       // register query info to queryTimeManager
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -811,10 +754,6 @@ public class TSServiceImpl implements TSIService.Iface {
         }
       }
 
-      statementId2QueryId
-          .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
-          .add(queryId);
-
       if (plan instanceof AuthorPlan) {
         plan.setLoginUserName(username);
       }
@@ -905,7 +844,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
       return resp;
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(queryId);
+      sessionManager.releaseQueryResourceNoExceptions(queryId);
       throw e;
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1109,7 +1048,7 @@ public class TSServiceImpl implements TSIService.Iface {
         return RpcUtils.getTSFetchResultsResp(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      if (!queryId2DataSet.containsKey(req.queryId)) {
+      if (!sessionManager.hasDataset(req.queryId)) {
         return RpcUtils.getTSFetchResultsResp(
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
@@ -1118,13 +1057,14 @@ public class TSServiceImpl implements TSIService.Iface {
       queryTimeManager.registerQuery(
           req.queryId, System.currentTimeMillis(), req.statement, req.timeout);
 
-      QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
+      QueryDataSet queryDataSet = sessionManager.getDataset(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result =
-            fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+            fillRpcReturnData(
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          releaseQueryResourceNoExceptions(req.queryId);
+          sessionManager.releaseQueryResourceNoExceptions(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1136,7 +1076,7 @@ public class TSServiceImpl implements TSIService.Iface {
       } else {
         TSQueryNonAlignDataSet nonAlignResult =
             fillRpcNonAlignReturnData(
-                req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = false;
         for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
           if (timeBuffer.limit() != 0) {
@@ -1145,7 +1085,7 @@ public class TSServiceImpl implements TSIService.Iface {
           }
         }
         if (!hasResultSet) {
-          queryId2DataSet.remove(req.queryId);
+          sessionManager.removeDataset(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1162,7 +1102,7 @@ public class TSServiceImpl implements TSIService.Iface {
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(req.queryId);
+      sessionManager.releaseQueryResourceNoExceptions(req.queryId);
       return RpcUtils.getTSFetchResultsResp(
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
@@ -1214,7 +1154,7 @@ public class TSServiceImpl implements TSIService.Iface {
     QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
     QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
     queryDataSet.setFetchSize(fetchSize);
-    queryId2DataSet.put(queryId, queryDataSet);
+    sessionManager.setDataset(queryId, queryDataSet);
     return queryDataSet;
   }
 
@@ -1243,7 +1183,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
     status = executeNonQueryPlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
-    long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
+    long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
     return resp.setQueryId(queryId);
   }
 
@@ -1260,7 +1200,7 @@ public class TSServiceImpl implements TSIService.Iface {
       throws QueryProcessException {
     PhysicalPlan physicalPlan =
         processor.parseSQLToPhysicalPlan(
-            statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+            statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
     return physicalPlan.isQuery()
         ? RpcUtils.getTSExecuteStatementResp(
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1273,7 +1213,7 @@ public class TSServiceImpl implements TSIService.Iface {
    * @return true: If logged in; false: If not logged in
    */
   private boolean checkLogin(long sessionId) {
-    boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
+    boolean isLoggedIn = sessionManager.getZoneId(sessionId) != null;
     if (!isLoggedIn) {
       LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
     }
@@ -1300,7 +1240,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = sessionIdZoneIdMap.get(sessionId);
+      ZoneId zoneId = sessionManager.getZoneId(sessionId);
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1315,7 +1255,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      sessionIdZoneIdMap.put(req.getSessionId(), ZoneId.of(req.getTimeZone()));
+      sessionManager.setTimezone(req.sessionId, req.timeZone);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(e, "setting time zone", TSStatusCode.SET_TIME_ZONE_ERROR);
@@ -1947,11 +1887,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public long requestStatementId(long sessionId) {
-    long statementId = statementIdGenerator.incrementAndGet();
-    sessionId2StatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
-        .add(statementId);
-    return statementId;
+    return sessionManager.requestStatementId(sessionId);
   }
 
   @Override
@@ -2036,7 +1972,7 @@ public class TSServiceImpl implements TSIService.Iface {
   private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
     List<PartialPath> paths = plan.getPaths();
     try {
-      if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
+      if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation " + plan.getOperatorType());
@@ -2065,11 +2001,6 @@ public class TSServiceImpl implements TSIService.Iface {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
-    return QueryResourceManager.getInstance()
-        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
-  }
-
   protected List<TSDataType> getSeriesTypesByPaths(
       List<PartialPath> paths, List<String> aggregations) throws MetadataException {
     return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);