You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/28 12:05:29 UTC

[iotdb] 01/01: [IOTDB-4741] [IOTDB-4767] Support fetching all connection info in Session & SessionPool

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4741
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c4536e4ce9c854e79efc9f7c230df2982421ea21
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 28 20:05:08 2022 +0800

    [IOTDB-4741] [IOTDB-4767] Support fetching all connection info in Session & SessionPool
---
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |   4 +-
 client-py/setup.py                                 |   2 +-
 external-api/pom.xml                               |   8 +
 .../external/api/thrift/JudgableServerContext.java |  44 ++++
 .../external/api/thrift/ServerContextFactory.java  |  27 ++
 integration-test/import-control.xml                |   2 +
 .../iotdb/session/it/IoTDBConnectionInfoIT.java    |  66 +++++
 jdbc/src/main/feature/feature.xml                  |   2 +-
 .../apache/iotdb/commons/service/ServiceType.java  |  29 +--
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   5 +-
 .../iotdb/db/client/DataNodeInternalClient.java    |  27 +-
 .../db/protocol/influxdb/handler/QueryHandler.java |   4 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |  27 +-
 .../iotdb/db/protocol/mqtt/PublishHandler.java     |  35 +--
 .../iotdb/db/qp/physical/crud/GroupByTimePlan.java |   2 +-
 .../apache/iotdb/db/qp/utils/DateTimeUtils.java    |   4 +-
 .../iotdb/db/query/control/SessionManager.java     | 249 +++++++++++--------
 .../db/query/control/SessionManagerMBean.java      |  30 +++
 .../db/query/control/SessionTimeoutManager.java    |  39 ++-
 .../query/control/clientsession/ClientSession.java |  86 +++++++
 .../control/clientsession/IClientSession.java      | 133 ++++++++++
 .../clientsession/InternalClientSession.java       |  72 ++++++
 .../control/clientsession/MqttClientSession.java   |  66 +++++
 .../apache/iotdb/db/query/executor/fill/IFill.java |   2 +-
 .../thrift/handler/BaseServerContextHandler.java   |  79 ++++++
 .../handler/InfluxDBServiceThriftHandler.java      |  11 +-
 .../thrift/handler/RPCServiceThriftHandler.java    |  19 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 273 ++++++++++-----------
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  30 ++-
 .../db/service/thrift/impl/TSServiceImpl.java      | 246 ++++++++++---------
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |   4 +
 .../rpc/TimeoutChangeableTFastFramedTransport.java |   8 +-
 .../TimeoutChangeableTSnappyFramedTransport.java   |   8 +-
 .../java/org/apache/iotdb/session/ISession.java    |   3 +
 .../java/org/apache/iotdb/session/Session.java     |   6 +
 .../apache/iotdb/session/SessionConnection.java    |  17 ++
 .../org/apache/iotdb/session/pool/SessionPool.java |  21 ++
 thrift/src/main/thrift/client.thrift               |  19 ++
 39 files changed, 1246 insertions(+), 465 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 309a5bc85c..74ad1c83b0 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -581,7 +581,7 @@ public abstract class AbstractCli {
    * @param columnCount the number of column
    * @param resultSetMetaData jdbc resultSetMetaData
    * @param zoneId your time zone
-   * @return List<List<String>> result
+   * @return {@literal List<List<String>> result}
    * @throws SQLException throw exception
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index 333be8ca25..51ece1e394 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -745,8 +745,8 @@ public class ImportCsv extends AbstractCsvTool {
    * read data from the CSV file
    *
    * @param path
-   * @return
-   * @throws IOException
+   * @return CSVParser csv parser
+   * @throws IOException when reading the csv file failed.
    */
   private static CSVParser readCsvFile(String path) throws IOException {
     return CSVFormat.Builder.create(CSVFormat.DEFAULT)
diff --git a/client-py/setup.py b/client-py/setup.py
index a3f147af3c..0e67070dde 100644
--- a/client-py/setup.py
+++ b/client-py/setup.py
@@ -31,7 +31,7 @@ print(long_description)
 
 setuptools.setup(
     name="apache-iotdb",  # Replace with your own username
-    version="0.13.0",
+    version="0.14.0",
     author=" Apache Software Foundation",
     author_email="dev@iotdb.apache.org",
     description="Apache IoTDB client API",
diff --git a/external-api/pom.xml b/external-api/pom.xml
index 354c820513..9dfe2c082a 100644
--- a/external-api/pom.xml
+++ b/external-api/pom.xml
@@ -28,6 +28,14 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>external-api</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${thrift.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
     <profiles>
         <profile>
             <id>get-jar-with-dependencies</id>
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
new file mode 100644
index 0000000000..a8020da267
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.server.ServerContext;
+
+public interface JudgableServerContext extends ServerContext {
+
+  /**
+   * this method will be called when a client connects to the IoTDB server.
+   *
+   * @return false if we do not allow this connection
+   */
+  boolean whenConnect();
+
+  /** @return false if we do not allow this connection */
+  boolean whenDisconnect();
+
+  @Override
+  default <T> T unwrap(Class<T> iface) {
+    return null;
+  }
+
+  @Override
+  default boolean isWrapperFor(Class<?> iface) {
+    return false;
+  };
+}
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
new file mode 100644
index 0000000000..048a93ca8f
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.protocol.TProtocol;
+
+import java.net.Socket;
+
+public interface ServerContextFactory {
+  JudgableServerContext newServerContext(TProtocol out, Socket socket);
+}
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index 532062face..cf83b6dff7 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -82,6 +82,8 @@
     <allow class="org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService" />
     <allow class="org.apache.iotdb.db.exception.TriggerManagementException" />
     <allow class="org.apache.iotdb.tsfile.common.constant.TsFileConstant" />
+    <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp" />
+    <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionType" />
     <allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true" />
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java
new file mode 100644
index 0000000000..9a45ea93ed
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionInfoIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+import org.apache.iotdb.session.ISession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBConnectionInfoIT {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeTest();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterTest();
+  }
+
+  @Test
+  public void testGetBackupConfiguration() {
+    try (ISession session1 = EnvFactory.getEnv().getSessionConnection()) {
+      TSConnectionInfoResp resp1 = session1.fetchAllConnections();
+      for (int i = 0; i < resp1.connectionInfoList.size(); i++) {
+        assertEquals("root", resp1.connectionInfoList.get(i).userName);
+        assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(i).type);
+        assertTrue(resp1.connectionInfoList.get(i).connectionId.startsWith("127.0.0.1"));
+      }
+
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/jdbc/src/main/feature/feature.xml b/jdbc/src/main/feature/feature.xml
index 933a75171e..2ce90e664a 100644
--- a/jdbc/src/main/feature/feature.xml
+++ b/jdbc/src/main/feature/feature.xml
@@ -18,7 +18,7 @@
 
 -->
 <features xmlns="http://karaf.apache.org/xmlns/features/v1.5.0" name="driver-s7-feature">
-    <feature name="iotdb-feature" description="iotdb-feature" version="0.10.0.SNAPSHOT">
+    <feature name="iotdb-feature" description="iotdb-feature" version="0.14.0.SNAPSHOT">
         <details>Feature to install required Bundle to use IoTDB inside Karaf container</details>
         <feature prerequisite="true">wrap</feature>
         <feature>scr</feature>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index dc9f7a3c16..1734f1da47 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -20,30 +20,31 @@
 package org.apache.iotdb.commons.service;
 
 public enum ServiceType {
-  STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
+  STORAGE_ENGINE_SERVICE("Storage Engine ServerService", "StorageEngine"),
   JMX_SERVICE("JMX ServerService", "JMX ServerService"),
   METRIC_SERVICE("Metrics ServerService", "MetricService"),
   RPC_SERVICE("RPC ServerService", "RPCService"),
   INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
-  MQTT_SERVICE("MQTTService", ""),
+  MQTT_SERVICE("MQTTService", "MqttService"),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
-  STAT_MONITOR_SERVICE("Statistics ServerService", ""),
-  WAL_SERVICE("WAL ServerService", ""),
-  CLOSE_MERGE_SERVICE("Close&Merge ServerService", ""),
-  JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
-  AUTHORIZATION_SERVICE("Authorization ServerService", ""),
-  FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
-  UPGRADE_SERVICE("UPGRADE DataService", ""),
-  SETTLE_SERVICE("SETTLE DataService", ""),
+  STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
+  WAL_SERVICE("WAL ServerService", "WalService"),
+  CLOSE_MERGE_SERVICE("Close&Merge ServerService", "CloseMergeService"),
+  JVM_MEM_CONTROL_SERVICE("Memory Controller", "JvmMemControlService"),
+  AUTHORIZATION_SERVICE("Authorization ServerService", "AuthService"),
+  FILE_READER_MANAGER_SERVICE("File reader manager ServerService", "FileReaderManagerService"),
+  UPGRADE_SERVICE("UPGRADE DataService", "UpgradeService"),
+  SETTLE_SERVICE("SETTLE DataService", "SettleService"),
   SYNC_RPC_SERVICE("Sync RPC ServerService", ""),
-  SYNC_SERVICE("Sync Service", ""),
+  SYNC_SERVICE("Sync Service", "SyncService"),
   MERGE_SERVICE("Merge Manager", "Merge Manager"),
   COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
-  UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
-  TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
-  TRIGGER_REGISTRATION_SERVICE_OLD("Old Standalone Trigger Registration Service", ""),
+  UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", "UdfClassLoader"),
+  TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", "TempQueryDataFile"),
+  TRIGGER_REGISTRATION_SERVICE_OLD(
+      "Old Standalone Trigger Registration Service", "TriggerRegistration"),
   CACHE_HIT_RATIO_DISPLAY_SERVICE(
       "CACHE_HIT_RATIO_DISPLAY_SERVICE",
       generateJmxName("org.apache.iotdb.service", "Cache Hit Ratio")),
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 33160063b5..d78d18d122 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -149,9 +150,9 @@ public class AuthorityChecker {
   }
 
   /** Check whether specific Session has the authorization to given plan. */
-  public static TSStatus checkAuthority(Statement statement, long sessionId) {
+  public static TSStatus checkAuthority(Statement statement, IClientSession session) {
     try {
-      if (!checkAuthorization(statement, sessionManager.getUsername(sessionId))) {
+      if (!checkAuthorization(statement, session.getUsername())) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation, please add privilege "
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index ff905f7a63..2ca17dd01a 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -38,6 +38,8 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
+import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -60,7 +62,7 @@ public class DataNodeInternalClient {
 
   private final ISchemaFetcher SCHEMA_FETCHER;
 
-  private final long sessionId;
+  private final IClientSession session;
 
   public DataNodeInternalClient(SessionInfo sessionInfo) {
     if (config.isClusterMode()) {
@@ -72,13 +74,16 @@ public class DataNodeInternalClient {
     }
 
     try {
-      sessionId =
-          SESSION_MANAGER.requestSessionId(
-              sessionInfo.getUserName(),
-              sessionInfo.getZoneId(),
-              IoTDBConstant.ClientVersion.V_0_13);
+      session = new InternalClientSession("SELECT_INTO");
 
-      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+      SESSION_MANAGER.registerSession(session);
+      SESSION_MANAGER.supplySession(
+          session,
+          sessionInfo.getUserName(),
+          sessionInfo.getZoneId(),
+          IoTDBConstant.ClientVersion.V_0_13);
+
+      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), session);
     } catch (Exception e) {
       LOGGER.warn("User {} opens internal Session failed.", sessionInfo.getUserName(), e);
       throw new IntoProcessException(
@@ -89,7 +94,7 @@ public class DataNodeInternalClient {
   public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
     try {
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      TSStatus status = AuthorityChecker.checkAuthority(statement, session);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -100,7 +105,7 @@ public class DataNodeInternalClient {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
+              SESSION_MANAGER.getSessionInfo(session),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -112,8 +117,8 @@ public class DataNodeInternalClient {
   }
 
   public void close() {
-    SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
-    SESSION_MANAGER.closeSession(sessionId);
+    SESSION_MANAGER.releaseSessionResource(session, this::cleanupQueryExecution);
+    SESSION_MANAGER.closeSession(session);
   }
 
   private void cleanupQueryExecution(Long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
index 024fa011a6..40297a0420 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
@@ -466,7 +466,9 @@ public class QueryHandler extends AbstractQueryHandler {
     try {
       QueryPlan queryPlan =
           (QueryPlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(querySql);
-      TSStatus tsStatus = SessionManager.getInstance().checkAuthority(queryPlan, sessionId);
+      TSStatus tsStatus =
+          SessionManager.getInstance()
+              .checkAuthority(queryPlan, SessionManager.getInstance().getCurrSession());
       if (tsStatus != null) {
         throw new AuthException(tsStatus.getMessage());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a28bde43a7..0ffae5e778 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -58,7 +59,9 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
-  private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+      new ConcurrentHashMap<>();
   private final PayloadFormatter payloadFormat;
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
@@ -81,15 +84,17 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onConnect(InterceptConnectMessage msg) {
-    if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
       try {
+        MqttClientSession session = new MqttClientSession(msg.getClientID());
         BasicOpenSessionResp basicOpenSessionResp =
-            SESSION_MANAGER.openSession(
+            SESSION_MANAGER.login(
+                session,
                 msg.getUsername(),
                 new String(msg.getPassword()),
                 ZoneId.systemDefault().toString(),
                 TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
-        clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+        clientIdToSessionMap.put(msg.getClientID(), session);
       } catch (TException e) {
         throw new RuntimeException(e);
       }
@@ -98,19 +103,19 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onDisconnect(InterceptDisconnectMessage msg) {
-    Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
-    if (null != sessionId) {
-      SESSION_MANAGER.closeSession(sessionId);
+    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+    if (null != session) {
+      SESSION_MANAGER.closeSession(session);
     }
   }
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
     String clientId = msg.getClientID();
-    if (!clientIdToSessionIdMap.containsKey(clientId)) {
+    if (!clientIdToSessionMap.containsKey(clientId)) {
       return;
     }
-    long sessionId = clientIdToSessionIdMap.get(msg.getClientID());
+    MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
     ByteBuf payload = msg.getPayload();
     String topic = msg.getTopicName();
     String username = msg.getUsername();
@@ -145,7 +150,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
         statement.setNeedInferType(true);
         statement.setAligned(false);
 
-        tsStatus = AuthorityChecker.checkAuthority(statement, sessionId);
+        tsStatus = AuthorityChecker.checkAuthority(statement, session);
         if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           LOG.warn(tsStatus.message);
         } else {
@@ -155,7 +160,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
                   .execute(
                       statement,
                       queryId,
-                      SESSION_MANAGER.getSessionInfo(sessionId),
+                      SESSION_MANAGER.getSessionInfo(session),
                       "",
                       partitionFetcher,
                       schemaFetcher,
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 2a1f544101..5cb35b085a 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 
 import io.moquette.interception.AbstractInterceptHandler;
@@ -45,7 +45,8 @@ public class PublishHandler extends AbstractInterceptHandler {
   private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
   private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+      new ConcurrentHashMap<>();
 
   private final PayloadFormatter payloadFormat;
 
@@ -60,15 +61,17 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onConnect(InterceptConnectMessage msg) {
-    if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
       try {
-        BasicOpenSessionResp basicOpenSessionResp =
-            SESSION_MANAGER.openSession(
-                msg.getUsername(),
-                new String(msg.getPassword()),
-                ZoneId.systemDefault().toString(),
-                TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
-        clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+        MqttClientSession session = new MqttClientSession(msg.getClientID());
+        // TODO should we put this session into a ThreadLocal in SessionManager?
+        SESSION_MANAGER.login(
+            session,
+            msg.getUsername(),
+            new String(msg.getPassword()),
+            ZoneId.systemDefault().toString(),
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        clientIdToSessionMap.put(msg.getClientID(), session);
       } catch (TException e) {
         throw new RuntimeException(e);
       }
@@ -77,19 +80,19 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onDisconnect(InterceptDisconnectMessage msg) {
-    Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
-    if (null != sessionId) {
-      SESSION_MANAGER.closeSession(sessionId);
+    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+    if (null != session) {
+      SESSION_MANAGER.closeSession(session);
     }
   }
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
     String clientId = msg.getClientID();
-    if (!clientIdToSessionIdMap.containsKey(clientId)) {
+    if (!clientIdToSessionMap.containsKey(clientId)) {
       return;
     }
-    long sessionId = clientIdToSessionIdMap.get(msg.getClientID());
+    MqttClientSession session = clientIdToSessionMap.get(clientId);
     ByteBuf payload = msg.getPayload();
     String topic = msg.getTopicName();
     String username = msg.getUsername();
@@ -124,7 +127,7 @@ public class PublishHandler extends AbstractInterceptHandler {
                 event.getTimestamp(),
                 event.getMeasurements().toArray(new String[0]),
                 event.getValues().toArray(new String[0]));
-        TSStatus tsStatus = SESSION_MANAGER.checkAuthority(plan, sessionId);
+        TSStatus tsStatus = SESSION_MANAGER.checkAuthority(plan, session);
         if (tsStatus != null) {
           LOG.warn(tsStatus.message);
         } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index 0bb4d62483..6d0271eb13 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan {
               plan.getEndTime(),
               plan.isSlidingStepByMonth(),
               plan.isIntervalByMonth(),
-              SessionManager.getInstance().getCurrSessionTimeZone())));
+              SessionManager.getInstance().getSessionTimeZone())));
     } else {
       return new GlobalTimeExpression(
           new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
index 04cfad9ccb..6ae319e698 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
@@ -589,7 +589,7 @@ public class DateTimeUtils {
           res *= 30 * 86_400_000L;
         } else {
           Calendar calendar = Calendar.getInstance();
-          calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+          calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
           calendar.setTimeInMillis(currentTime);
           calendar.add(Calendar.MONTH, (int) (value));
           res = calendar.getTimeInMillis() - currentTime;
@@ -744,7 +744,7 @@ public class DateTimeUtils {
    */
   public static long calcIntervalByMonth(long startTime, long numMonths) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     boolean isLastDayOfMonth =
         calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 53f140d317..1270cd9e57 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.conf.OperationType;
@@ -30,10 +30,13 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
@@ -42,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
@@ -49,47 +53,47 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
-public class SessionManager {
+public class SessionManager implements SessionManagerMBean {
   private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
   public static final Logger AUDIT_LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
   // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+  /** currSession can be only used in client-thread model services. */
+  private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
 
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
+  // sessions does not contain MqttSessions..
+  private final Map<IClientSession, Object> sessions = new ConcurrentHashMap<>();
+  // used for sessions.
+  private final Object placeHolder = new Object();
+
+  // we keep this sessionIdGenerator just for keep Compatible with v0.13
+  @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
   // The statementId is unique in one IoTDB instance.
   private final AtomicLong statementIdGenerator = new AtomicLong();
 
-  // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
   // (statementId -> Set(queryId))
   private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  // (sessionId -> client version number)
-  private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
-      new ConcurrentHashMap<>();
-
-  // TODO sessionIdToUsername and sessionIdToZoneId should be replaced with this
-  private final Map<Long, SessionInfo> sessionIdToSessionInfo = new ConcurrentHashMap<>();
-
   public static final TSProtocolVersion CURRENT_RPC_VERSION =
       TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
 
   protected SessionManager() {
     // singleton
+    String mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "RpcSession");
+    JMXService.registerMBean(this, mbeanName);
   }
 
-  public BasicOpenSessionResp openSession(
+  public BasicOpenSessionResp login(
+      IClientSession session,
       String username,
       String password,
       String zoneId,
@@ -115,12 +119,12 @@ public class SessionManager {
             .setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode())
             .setMessage("The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
       } else {
-        long sessionId = requestSessionId(username, zoneId, clientVersion);
+        supplySession(session, username, zoneId, clientVersion);
 
-        SessionTimeoutManager.getInstance().register(sessionId);
+        SessionTimeoutManager.getInstance().register(session);
 
         openSessionResp
-            .sessionId(sessionId)
+            .sessionId(session.getId())
             .setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())
             .setMessage("Login successfully");
 
@@ -129,7 +133,7 @@ public class SessionManager {
             IoTDBConstant.GLOBAL_DB_NAME,
             openSessionResp.getMessage(),
             username,
-            sessionId);
+            session);
       }
     } else {
       AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
@@ -142,27 +146,31 @@ public class SessionManager {
     return openSessionResp;
   }
 
-  public BasicOpenSessionResp openSession(
-      String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+  public BasicOpenSessionResp login(
+      IClientSession session,
+      String username,
+      String password,
+      String zoneId,
+      TSProtocolVersion tsProtocolVersion)
       throws TException {
-    return openSession(
-        username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+    return login(
+        session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
   }
 
-  public boolean closeSession(long sessionId) {
-    AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-    currSessionId.remove();
-    return SessionTimeoutManager.getInstance().unregister(sessionId);
+  public boolean closeSession(IClientSession session) {
+    AUDIT_LOGGER.info("Session-{} is closing", session);
+    currSession.remove();
+    return SessionTimeoutManager.getInstance().unregister(session);
   }
 
   public TSStatus closeOperation(
-      long sessionId,
+      IClientSession session,
       long queryId,
       long statementId,
       boolean haveStatementId,
       boolean haveSetQueryId,
       Consumer<Long> releaseByQueryId) {
-    if (!checkLogin(sessionId)) {
+    if (!checkLogin(session)) {
       return RpcUtils.getStatus(
           TSStatusCode.NOT_LOGIN_ERROR,
           "Log in failed. Either you are not authorized or the session has timed out.");
@@ -172,7 +180,7 @@ public class SessionManager {
       AUDIT_LOGGER.debug(
           "{}: receive close operation from Session {}",
           IoTDBConstant.GLOBAL_DB_NAME,
-          currSessionId);
+          currSession.get());
     }
 
     try {
@@ -180,7 +188,7 @@ public class SessionManager {
         if (haveSetQueryId) {
           this.closeDataset(statementId, queryId, releaseByQueryId);
         } else {
-          this.closeStatement(sessionId, statementId, releaseByQueryId);
+          this.closeStatement(session, statementId, releaseByQueryId);
         }
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
@@ -194,13 +202,13 @@ public class SessionManager {
   }
 
   public TSStatus closeOperation(
-      long sessionId,
+      IClientSession session,
       long queryId,
       long statementId,
       boolean haveStatementId,
       boolean haveSetQueryId) {
     return closeOperation(
-        sessionId,
+        session,
         queryId,
         statementId,
         haveStatementId,
@@ -213,39 +221,25 @@ public class SessionManager {
    *
    * @return true: If logged in; false: If not logged in
    */
-  public boolean checkLogin(long sessionId) {
-    Long currentSessionId = getCurrSessionId();
-    boolean isLoggedIn = currentSessionId != null && currentSessionId == sessionId;
+  public boolean checkLogin(IClientSession session) {
+    boolean isLoggedIn = session != null && session.isLogin();
+
     if (!isLoggedIn) {
       LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
     } else {
-      SessionTimeoutManager.getInstance().refresh(sessionId);
+      SessionTimeoutManager.getInstance().refresh(session);
     }
     return isLoggedIn;
   }
 
-  public long requestSessionId(
-      String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
-    long sessionId = sessionIdGenerator.incrementAndGet();
-
-    currSessionId.set(sessionId);
-    sessionIdToUsername.put(sessionId, username);
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
-    sessionIdToClientVersion.put(sessionId, clientVersion);
-    sessionIdToSessionInfo.put(sessionId, new SessionInfo(sessionId, username, zoneId));
-
-    return sessionId;
+  public boolean releaseSessionResource(IClientSession session) {
+    return releaseSessionResource(session, this::releaseQueryResourceNoExceptions);
   }
 
-  public boolean releaseSessionResource(long sessionId) {
-    return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
-  }
+  public boolean releaseSessionResource(
+      IClientSession session, Consumer<Long> releaseQueryResource) {
+    Set<Long> statementIdSet = session.getStatementIds();
 
-  public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
-    sessionIdToZoneId.remove(sessionId);
-    sessionIdToClientVersion.remove(sessionId);
-
-    Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
     if (statementIdSet != null) {
       for (Long statementId : statementIdSet) {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -255,44 +249,43 @@ public class SessionManager {
           }
         }
       }
+      return true;
     }
 
-    return sessionIdToUsername.remove(sessionId) != null;
+    // TODO if there is no statement for the session, how to return (true or false?)
+    return false;
   }
 
-  public long getSessionIdByQueryId(long queryId) {
+  public IClientSession getSessionIdByQueryId(long queryId) {
     // TODO: make this more efficient with a queryId -> sessionId map
     for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
       if (statementToQueries.getValue().contains(queryId)) {
-        for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
-          if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
-            return sessionToStatements.getKey();
+        Long statementId = statementToQueries.getKey();
+        for (IClientSession session : sessions.keySet()) {
+          if (session.getStatementIds().contains(statementId)) {
+            return session;
           }
         }
       }
     }
-    return -1;
+    return null;
   }
 
-  public long requestStatementId(long sessionId) {
+  public long requestStatementId(IClientSession session) {
     long statementId = statementIdGenerator.incrementAndGet();
-    sessionIdToStatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
-        .add(statementId);
+    session.getStatementIds().add(statementId);
     return statementId;
   }
 
-  public void closeStatement(long sessionId, long statementId, Consumer<Long> releaseByQueryId) {
+  public void closeStatement(
+      IClientSession session, long statementId, Consumer<Long> releaseByQueryId) {
     Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
     if (queryIdSet != null) {
       for (Long queryId : queryIdSet) {
         releaseByQueryId.accept(queryId);
       }
     }
-
-    if (sessionIdToStatementId.containsKey(sessionId)) {
-      sessionIdToStatementId.get(sessionId).remove(statementId);
-    }
+    session.getStatementIds().remove(statementId);
   }
 
   public long requestQueryId(Long statementId, boolean isDataQuery) {
@@ -340,9 +333,9 @@ public class SessionManager {
   }
 
   /** Check whether specific Session has the authorization to given plan. */
-  public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+  public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
     try {
-      if (!checkAuthorization(plan, getUsername(sessionId))) {
+      if (!checkAuthorization(plan, session.getUsername())) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation, please add privilege "
@@ -359,41 +352,74 @@ public class SessionManager {
     return null;
   }
 
-  public Long getCurrSessionId() {
-    return currSessionId.get();
+  /**
+   * this method can be only used in client-thread model.
+   *
+   * @return
+   */
+  public IClientSession getCurrSession() {
+    return currSession.get();
   }
 
-  public TimeZone getCurrSessionTimeZone() {
-    if (getCurrSessionId() != null) {
-      return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+  public TimeZone getSessionTimeZone() {
+    IClientSession session = currSession.get();
+    if (session != null) {
+      return session.getTimeZone();
     } else {
       // only used for test
       return TimeZone.getTimeZone("+08:00");
     }
   }
 
-  public String getUsername(Long sessionId) {
-    String username = sessionIdToUsername.get(sessionId);
-    if (username == null) {
-      throw new RuntimeException(
-          new IoTDBException(
-              "session expired, please re-login.", TSStatusCode.SESSION_EXPIRED.getStatusCode()));
-    }
-    return username;
+  /**
+   * this method can be only used in client-thread model. But, in message-thread model based
+   * service, calling this method has no side effect. <br>
+   * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
+   * method in thrift's event handler.
+   *
+   * @return
+   */
+  public void removeCurrSession() {
+    IClientSession session = currSession.get();
+    sessions.remove(session);
+    currSession.remove();
   }
 
-  public ZoneId getZoneId(Long sessionId) {
-    ZoneId zoneId = sessionIdToZoneId.get(sessionId);
-    if (zoneId == null) {
-      throw new RuntimeException(
-          new IoTDBException(
-              "session expired, please re-login.", TSStatusCode.SESSION_EXPIRED.getStatusCode()));
+  /**
+   * this method can be only used in client-thread model. Do not use this method in message-thread
+   * model based service.
+   *
+   * @param session
+   * @return false if the session has been initialized.
+   */
+  public boolean registerSession(IClientSession session) {
+    if (this.currSession.get() != null) {
+      LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
+      return false;
     }
-    return zoneId;
+    this.currSession.set(session);
+    sessions.put(session, placeHolder);
+    return true;
   }
 
-  public void setTimezone(Long sessionId, String zone) {
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+  /**
+   * must be called after registerSession()) will mark the session login.
+   *
+   * @param username
+   * @param zoneId
+   * @param clientVersion
+   */
+  public void supplySession(
+      IClientSession session,
+      String username,
+      String zoneId,
+      IoTDBConstant.ClientVersion clientVersion) {
+    session.setId(sessionIdGenerator.incrementAndGet());
+    session.setUsername(username);
+    session.setZoneId(ZoneId.of(zoneId));
+    session.setClientVersion(clientVersion);
+    session.setLogin(true);
+    session.setLogInTime(System.currentTimeMillis());
   }
 
   public boolean hasDataset(Long queryId) {
@@ -408,10 +434,6 @@ public class SessionManager {
     queryIdToDataSet.put(queryId, dataSet);
   }
 
-  public void removeDataset(Long queryId) {
-    queryIdToDataSet.remove(queryId);
-  }
-
   public void closeDataset(Long statementId, Long queryId, Consumer<Long> releaseByQueryId) {
     releaseByQueryId.accept(queryId);
     if (statementIdToQueryId.containsKey(statementId)) {
@@ -419,16 +441,27 @@ public class SessionManager {
     }
   }
 
-  public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) {
-    return sessionIdToClientVersion.get(sessionId);
-  }
-
   public static SessionManager getInstance() {
     return SessionManagerHelper.INSTANCE;
   }
 
-  public SessionInfo getSessionInfo(long sessionId) {
-    return sessionIdToSessionInfo.get(sessionId);
+  public SessionInfo getSessionInfo(IClientSession session) {
+    return new SessionInfo(session.getId(), session.getUsername(), session.getZoneId().getId());
+  }
+
+  @Override
+  public Set<String> getAllRpcClients() {
+    return this.sessions.keySet().stream()
+        .map(IClientSession::toString)
+        .collect(Collectors.toSet());
+  }
+
+  public TSConnectionInfoResp getAllConnectionInfo() {
+    return new TSConnectionInfoResp(
+        sessions.keySet().stream()
+            .map(IClientSession::convertToTSConnectionInfo)
+            .sorted(Comparator.comparingLong(TSConnectionInfo::getLogInTime))
+            .collect(Collectors.toList()));
   }
 
   private static class SessionManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java
new file mode 100644
index 0000000000..74adfc2752
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManagerMBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import java.util.Set;
+
+public interface SessionManagerMBean {
+
+  /**
+   * @return client's reqId-username:ip:port <br>
+   *     reqId may be deprecated in 0.14
+   */
+  Set<String> getAllRpcClients();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index cb5003dc74..be97de1558 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.query.control;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 
 import org.slf4j.Logger;
@@ -38,7 +40,7 @@ public class SessionTimeoutManager {
   private static final long SESSION_TIMEOUT =
       IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
 
-  private Map<Long, Long> sessionIdToLastActiveTime;
+  private Map<IClientSession, Long> sessionToLastActiveTime;
   private ScheduledExecutorService executorService;
 
   private SessionTimeoutManager() {
@@ -46,14 +48,14 @@ public class SessionTimeoutManager {
       return;
     }
 
-    this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
+    this.sessionToLastActiveTime = new ConcurrentHashMap<>();
     this.executorService =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
 
     ScheduledExecutorUtil.safelyScheduleAtFixedRate(
         executorService,
         () -> {
-          if (!sessionIdToLastActiveTime.isEmpty()) {
+          if (!sessionToLastActiveTime.isEmpty()) {
             LOGGER.info("cleaning up expired sessions");
             cleanup();
           }
@@ -63,37 +65,42 @@ public class SessionTimeoutManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void register(long sessionId) {
+  public void register(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
+    sessionToLastActiveTime.put(session, System.currentTimeMillis());
   }
 
-  public boolean unregister(long sessionId) {
+  /**
+   * unregister the session and release all its query resources.
+   *
+   * @param session
+   * @return true if removing successfully, false otherwise (e.g., the session does not exist)
+   */
+  public boolean unregister(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
-      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId);
+      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session);
     }
 
-    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) {
-      return sessionIdToLastActiveTime.remove(sessionId) != null;
+    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) {
+      return sessionToLastActiveTime.remove(session) != null;
     }
 
     return false;
   }
 
-  public void refresh(long sessionId) {
+  public void refresh(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
-
-    sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
+    sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis());
   }
 
   private void cleanup() {
     long currentTime = System.currentTimeMillis();
-    sessionIdToLastActiveTime.entrySet().stream()
+    sessionToLastActiveTime.entrySet().stream()
         .filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
         .forEach(
             entry -> {
@@ -102,6 +109,12 @@ public class SessionTimeoutManager {
                     "session-{} timed out in {} ms",
                     entry.getKey(),
                     currentTime - entry.getValue());
+                // close the socket.
+                // currently, we only focus on RPC service.
+                // TODO do we need to consider MQTT ClientSession?
+                if (entry.getKey() instanceof ClientSession) {
+                  ((ClientSession) entry.getKey()).shutdownStream();
+                }
               }
             });
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
new file mode 100644
index 0000000000..0622715787
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** Client Session is the only identity for a connection. */
+public class ClientSession extends IClientSession {
+
+  private final Socket clientSocket;
+
+  // TODO why we use copyOnWriteArraySet instead of HashSet??
+  private final Set<Long> statements = new CopyOnWriteArraySet<>();
+
+  public ClientSession(Socket clientSocket) {
+    this.clientSocket = clientSocket;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientSocket.getInetAddress().getHostAddress();
+  }
+
+  @Override
+  int getClientPort() {
+    return clientSocket.getPort();
+  }
+
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.THRIFT_BASED;
+  }
+
+  @Override
+  String getConnectionId() {
+    return String.format("%s:%s", getClientAddress(), getClientPort());
+  }
+
+  @Override
+  public Set<Long> getStatementIds() {
+    return statements;
+  }
+
+  /**
+   * shutdownStream will close the socket stream directly, which cause a TTransportException with
+   * type = TTransportException.END_OF_FILE. In this case, thrift client thread will be finished
+   * asap.
+   */
+  public void shutdownStream() {
+    if (!clientSocket.isInputShutdown()) {
+      try {
+        clientSocket.shutdownInput();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    if (!clientSocket.isOutputShutdown()) {
+      try {
+        clientSocket.shutdownOutput();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
new file mode 100644
index 0000000000..31ce04bf60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.TimeZone;
+
+public abstract class IClientSession {
+
+  /** id is just used for keep compatible with v0.13 */
+  @Deprecated private long id;
+
+  private ClientVersion clientVersion;
+
+  private ZoneId zoneId;
+
+  // TODO: why some Statement Plans use timeZone while others use ZoneId?
+  private TimeZone timeZone;
+
+  private String username;
+
+  private boolean login = false;
+
+  private long logInTime;
+
+  abstract String getClientAddress();
+
+  abstract int getClientPort();
+
+  abstract TSConnectionType getConnectionType();
+
+  /** ip:port for thrift-based service and client id for mqtt-based service */
+  abstract String getConnectionId();
+
+  public void setClientVersion(ClientVersion clientVersion) {
+    this.clientVersion = clientVersion;
+  }
+
+  public ClientVersion getClientVersion() {
+    return this.clientVersion;
+  }
+
+  public ZoneId getZoneId() {
+    return this.zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+    this.timeZone = TimeZone.getTimeZone(zoneId);
+  }
+
+  public TimeZone getTimeZone() {
+    return timeZone;
+  }
+
+  public void setTimeZone(TimeZone timeZone) {
+    this.timeZone = timeZone;
+    this.zoneId = timeZone.toZoneId();
+  }
+
+  public String getUsername() {
+    return this.username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public boolean isLogin() {
+    return login;
+  }
+
+  public void setLogin(boolean login) {
+    this.login = login;
+  }
+
+  public void setLogInTime(long logInTime) {
+    this.logInTime = logInTime;
+  }
+
+  public long getLogInTime() {
+    return logInTime;
+  }
+
+  @Deprecated
+  public long getId() {
+    return id;
+  }
+
+  @Deprecated
+  public void setId(long id) {
+    this.id = id;
+  }
+
+  public String toString() {
+    return String.format(
+        "%d-%s:%s:%d", getId(), getUsername(), getClientAddress(), getClientPort());
+  }
+
+  public TSConnectionInfo convertToTSConnectionInfo() {
+    return new TSConnectionInfo(
+        getUsername(), getLogInTime(), getConnectionId(), getConnectionType());
+  }
+
+  /**
+   * statementIds that this client opens.<br>
+   * For JDBC clients, each Statement instance has a statement id.<br>
+   * For an IoTDBSession connection, each connection has a statement id.<br>
+   * mqtt clients have no statement id.
+   */
+  public abstract Set<Long> getStatementIds();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java
new file mode 100644
index 0000000000..37c9cf1735
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/InternalClientSession.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** For Internal usage, like CQ and Select Into */
+public class InternalClientSession extends IClientSession {
+
+  // For CQ, it will be cq_id
+  // For Select Into, it will be SELECT_INTO constant string
+  private final String clientID;
+
+  // TODO why we use copyOnWriteArraySet instead of HashSet??
+  private final Set<Long> statements = new CopyOnWriteArraySet<>();
+
+  public InternalClientSession(String clientID) {
+    this.clientID = clientID;
+  }
+
+  public String getClientID() {
+    return clientID;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientID;
+  }
+
+  @Override
+  public int getClientPort() {
+    return 0;
+  }
+
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.INTERNAL;
+  }
+
+  @Override
+  String getConnectionId() {
+    return clientID;
+  }
+
+  public String toString() {
+    return String.format("%d-%s", getId(), getClientID());
+  }
+
+  @Override
+  public Set<Long> getStatementIds() {
+    return statements;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
new file mode 100644
index 0000000000..a792c6f00f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MqttClientSession extends IClientSession {
+
+  private final String clientID;
+
+  public MqttClientSession(String clientID) {
+    this.clientID = clientID;
+  }
+
+  public String getClientID() {
+    return clientID;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientID;
+  }
+
+  @Override
+  public int getClientPort() {
+    return 0;
+  }
+
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.MQTT_BASED;
+  }
+
+  @Override
+  String getConnectionId() {
+    return clientID;
+  }
+
+  public String toString() {
+    return String.format("%d-%s", getId(), getClientID());
+  }
+
+  @Override
+  public Set<Long> getStatementIds() {
+    return Collections.emptySet();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 38ab341e45..f4be67eda6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -142,7 +142,7 @@ public abstract class IFill {
 
   protected long slideMonth(long startTime, int monthNum) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     calendar.add(Calendar.MONTH, monthNum);
     return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
new file mode 100644
index 0000000000..7366d2b47d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/BaseServerContextHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service.thrift.handler;
+
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.Socket;
+import java.util.ServiceLoader;
+
+public class BaseServerContextHandler {
+  private static ServerContextFactory factory = null;
+  private static final Logger logger = LoggerFactory.getLogger(BaseServerContextHandler.class);
+
+  static {
+    ServiceLoader<ServerContextFactory> contextFactoryLoader =
+        ServiceLoader.load(ServerContextFactory.class);
+    for (ServerContextFactory loader : contextFactoryLoader) {
+      if (factory != null) {
+        // it means there is more than one implementation.
+        logger.warn("There are more than one ServerContextFactory implementation. pls check.");
+      }
+      logger.info("Will set ServerContextFactory from {} ", loader.getClass().getName());
+      factory = loader;
+    }
+  }
+
+  public BaseServerContextHandler() {}
+
+  public ServerContext createContext(TProtocol in, TProtocol out) {
+    Socket socket =
+        ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+    JudgableServerContext context = null;
+    getSessionManager().registerSession(new ClientSession(socket));
+    if (factory != null) {
+      context = factory.newServerContext(out, socket);
+      if (!context.whenConnect()) {
+        return context;
+      }
+    }
+    return context;
+  }
+
+  public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
+    getSessionManager().removeCurrSession();
+    if (context != null && factory != null) {
+      ((JudgableServerContext) context).whenDisconnect();
+    }
+  }
+
+  protected SessionManager getSessionManager() {
+    return SessionManager.getInstance();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
index 031dcb1e49..35bdb99c8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InfluxDBServiceThriftHandler.java
@@ -26,7 +26,8 @@ import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
 
-public class InfluxDBServiceThriftHandler implements TServerEventHandler {
+public class InfluxDBServiceThriftHandler extends BaseServerContextHandler
+    implements TServerEventHandler {
   private final IInfluxDBServiceWithHandler impl;
 
   public InfluxDBServiceThriftHandler(IInfluxDBServiceWithHandler impl) {
@@ -39,16 +40,16 @@ public class InfluxDBServiceThriftHandler implements TServerEventHandler {
   }
 
   @Override
-  public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+  public ServerContext createContext(TProtocol in, TProtocol out) {
     // nothing
-    return null;
+    return super.createContext(in, out);
   }
 
   @Override
-  public void deleteContext(
-      ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
+  public void deleteContext(ServerContext serverContext, TProtocol in, TProtocol out) {
     // release resources.
     impl.handleClientExit();
+    super.deleteContext(serverContext, in, out);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 05c185bf59..a6f25d8df3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -26,9 +26,10 @@ import org.apache.thrift.transport.TTransport;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class RPCServiceThriftHandler implements TServerEventHandler {
+public class RPCServiceThriftHandler extends BaseServerContextHandler
+    implements TServerEventHandler {
 
-  private AtomicLong thriftConnectionNumber = new AtomicLong(0);
+  private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
   private final IClientRPCServiceWithHandler eventHandler;
 
   public RPCServiceThriftHandler(IClientRPCServiceWithHandler eventHandler) {
@@ -38,16 +39,17 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+  public ServerContext createContext(TProtocol in, TProtocol out) {
     thriftConnectionNumber.incrementAndGet();
-    return null;
+    return super.createContext(in, out);
   }
 
   @Override
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+  public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) {
     // release query resources.
     eventHandler.handleClientExit();
     thriftConnectionNumber.decrementAndGet();
+    super.deleteContext(arg0, in, out);
   }
 
   @Override
@@ -59,4 +61,11 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
   public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
     // nothing
   }
+
+  /**
+   * get the SessionManager Instance. <br>
+   * in v0.13, Cluster mode uses different SessionManager instance...
+   *
+   * @return
+   */
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 96ca1fdcf0..cdc33165e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Operation;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
@@ -59,14 +58,13 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.rpc.ConfigNodeConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
@@ -74,6 +72,7 @@ import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -129,6 +128,7 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
+import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -161,51 +161,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
-    TSStatus loginStatus;
-    try {
-      loginStatus = AuthorizerManager.getInstance().checkUser(req.username, req.password);
-    } catch (ConfigNodeConnectionException e) {
-      TSStatus tsStatus = RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR, e.getMessage());
-      return new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
-    }
-    BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
-    long sessionId = -1;
-    if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // check the version compatibility
-      boolean compatible = req.client_protocol.equals(SessionManager.CURRENT_RPC_VERSION);
-      if (!compatible) {
-        openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
-        openSessionResp.setMessage(
-            "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
-        openSessionResp = openSessionResp.sessionId(sessionId);
-      } else {
-        openSessionResp.setCode(loginStatus.getCode());
-        openSessionResp.setMessage(loginStatus.getMessage());
-
-        sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
-
-        LOGGER.info(
-            "{}: Login status: {}. User : {}, opens Session-{}",
-            IoTDBConstant.GLOBAL_DB_NAME,
-            openSessionResp.getMessage(),
+    BasicOpenSessionResp openSessionResp =
+        SESSION_MANAGER.login(
+            SESSION_MANAGER.getCurrSession(),
             req.username,
-            sessionId);
-
-        SessionTimeoutManager.getInstance().register(sessionId);
-        openSessionResp = openSessionResp.sessionId(sessionId);
-      }
-    } else {
-      openSessionResp.setMessage(loginStatus.getMessage());
-      openSessionResp.setCode(loginStatus.getCode());
-
-      sessionId = SESSION_MANAGER.requestSessionId(req.username, req.zoneId, clientVersion);
-      SessionManager.AUDIT_LOGGER.info(
-          "User {} opens Session failed with an incorrect password", req.username);
-
-      SessionTimeoutManager.getInstance().register(sessionId);
-      openSessionResp = openSessionResp.sessionId(sessionId);
-    }
-
+            req.password,
+            req.zoneId,
+            req.client_protocol,
+            clientVersion);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
@@ -221,9 +184,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
-    SESSION_MANAGER.releaseSessionResource(req.sessionId, this::cleanupQueryExecution);
+    SESSION_MANAGER.releaseSessionResource(
+        SESSION_MANAGER.getCurrSession(), this::cleanupQueryExecution);
     return new TSStatus(
-        !SESSION_MANAGER.closeSession(req.sessionId)
+        !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -237,7 +201,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
     return SESSION_MANAGER.closeOperation(
-        req.sessionId,
+        SESSION_MANAGER.getCurrSession(),
         req.queryId,
         req.statementId,
         req.isSetStatementId(),
@@ -248,7 +212,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -263,7 +227,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
@@ -301,20 +265,21 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
-            "Session-{} create storage group {}", SESSION_MANAGER.getCurrSessionId(), storageGroup);
+            "Session-{} create storage group {}", SESSION_MANAGER.getCurrSession(), storageGroup);
       }
 
       // Step 1: Create SetStorageGroupStatement
       SetStorageGroupStatement statement =
           (SetStorageGroupStatement) StatementGenerator.createStatement(storageGroup);
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -325,7 +290,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -342,13 +307,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSession(), req.getPath());
       }
 
       // measurementAlias is also a nodeName
@@ -358,7 +323,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           (CreateTimeSeriesStatement) StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -369,7 +335,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -386,14 +352,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getPrefixPath(),
             req.getMeasurements());
       }
@@ -408,7 +374,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -419,7 +386,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -436,14 +403,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -456,7 +423,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -467,7 +435,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -484,7 +452,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteTimeseries(long sessionId, List<String> path) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -493,7 +461,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           StatementGenerator.createDeleteTimeSeriesStatement(path);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -504,7 +473,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -521,14 +490,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} delete {} storage groups, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             storageGroups.size(),
             storageGroups.get(0));
       }
@@ -538,7 +507,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           (DeleteStorageGroupStatement) StatementGenerator.createStatement(storageGroups);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -549,7 +519,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -571,7 +541,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
     String statement = req.getStatement();
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
 
@@ -579,7 +549,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     try {
       Statement s =
           StatementGenerator.createStatement(
-              statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+              statement, SESSION_MANAGER.getCurrSession().getZoneId());
 
       if (s == null) {
         return RpcUtils.getTSExecuteStatementResp(
@@ -587,7 +557,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
                 TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
       }
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return RpcUtils.getTSExecuteStatementResp(status);
       }
@@ -595,13 +565,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement);
 
-      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
               s,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               statement,
               PARTITION_FETCHER,
               SCHEMA_FETCHER,
@@ -645,7 +615,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     long t1 = System.currentTimeMillis();
     List<TSStatus> results = new ArrayList<>();
     boolean isAllSuccessful = true;
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
@@ -654,13 +624,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       try {
         Statement s =
             StatementGenerator.createStatement(
-                statement, SESSION_MANAGER.getZoneId(req.getSessionId()));
+                statement, SESSION_MANAGER.getCurrSession().getZoneId());
         if (s == null) {
           return RpcUtils.getStatus(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported");
         }
         // permission check
-        TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+        TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return status;
         }
@@ -675,7 +645,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
             COORDINATOR.execute(
                 s,
                 queryId,
-                SESSION_MANAGER.getSessionInfo(req.sessionId),
+                SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
                 statement,
                 PARTITION_FETCHER,
                 SCHEMA_FETCHER,
@@ -712,7 +682,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
@@ -744,14 +714,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertRecords(TSInsertRecordsReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.prefixPaths.get(0),
             req.getTimestamps().get(0));
       }
@@ -767,7 +737,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -778,7 +749,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -798,14 +769,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.prefixPath,
             req.getTimestamps().get(0));
       }
@@ -822,7 +793,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -833,7 +805,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -853,14 +825,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session {} insertRecords, device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.prefixPath,
             req.getTimestamps().get(0));
       }
@@ -877,7 +849,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -888,7 +861,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -911,13 +884,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertRecord(TSInsertRecordReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -931,7 +904,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -942,7 +916,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -962,7 +936,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -977,7 +951,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -988,7 +963,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1008,7 +983,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertTablet(TSInsertTabletReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1023,7 +998,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1034,7 +1010,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1054,14 +1030,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session {} insertRecords, first device {}, first time {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.prefixPaths.get(0),
             req.getTimestamps().get(0));
       }
@@ -1076,7 +1052,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1086,7 +1063,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1147,14 +1124,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       DeleteDataStatement statement = StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1164,7 +1142,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1180,29 +1158,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
     try {
       Statement s =
-          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return RpcUtils.getTSExecuteStatementResp(status);
       }
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req);
-      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
               s,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER,
@@ -1241,29 +1219,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
     try {
       Statement s =
-          StatementGenerator.createStatement(req, SESSION_MANAGER.getZoneId(req.getSessionId()));
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return RpcUtils.getTSExecuteStatementResp(status);
       }
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
       AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req);
-      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, true);
+      long queryId = SESSION_MANAGER.requestQueryId(req.statementId, false);
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
               s,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER,
@@ -1303,20 +1281,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(sessionId);
+    return SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
   }
 
   @Override
   public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getName());
       }
 
@@ -1324,7 +1302,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       CreateSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1335,7 +1314,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1364,7 +1343,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
     TSQueryTemplateResp resp = new TSQueryTemplateResp();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         resp.setStatus(getNotLoggedInStatus());
         return resp;
       }
@@ -1405,7 +1384,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     long startTime = System.currentTimeMillis();
     try {
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         resp.setStatus(status);
         return resp;
@@ -1420,7 +1400,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               null,
               PARTITION_FETCHER,
               SCHEMA_FETCHER,
@@ -1471,14 +1451,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} set schema template {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getTemplateName(),
             req.getPrefixPath());
       }
@@ -1488,7 +1468,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       SetSchemaTemplateStatement statement = StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1499,7 +1480,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1539,17 +1520,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return SyncService.getInstance().transportFile(metaInfo, buff);
   }
 
+  @Override
+  public TSConnectionInfoResp fetchAllConnectionsInfo() throws TException {
+    return SESSION_MANAGER.getAllConnectionInfo();
+  }
+
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1559,7 +1545,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
 
       // permission check
-      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      TSStatus status =
+          AuthorityChecker.checkAuthority(statement, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
@@ -1570,7 +1557,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
           COORDINATOR.execute(
               statement,
               queryId,
-              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
               "",
               PARTITION_FETCHER,
               SCHEMA_FETCHER);
@@ -1622,9 +1609,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public void handleClientExit() {
-    Long sessionId = SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      TSCloseSessionReq req = new TSCloseSessionReq();
       closeSession(req);
     }
     SyncService.getInstance().handleClientExit();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index 8209b43487..58406c62c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -37,9 +37,9 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
-import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.utils.DataTypeUtils;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq;
 import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq;
@@ -77,8 +77,12 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
   @Override
   public InfluxOpenSessionResp openSession(InfluxOpenSessionReq req) throws TException {
     BasicOpenSessionResp basicOpenSessionResp =
-        SESSION_MANAGER.openSession(
-            req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        SESSION_MANAGER.login(
+            SESSION_MANAGER.getCurrSession(),
+            req.username,
+            req.password,
+            req.zoneId,
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
     return new InfluxOpenSessionResp()
         .setStatus(
             RpcUtils.getInfluxDBStatus(
@@ -89,14 +93,14 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
   @Override
   public InfluxTSStatus closeSession(InfluxCloseSessionReq req) {
     return new InfluxTSStatus(
-        !SESSION_MANAGER.closeSession(req.sessionId)
+        !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
             ? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
   @Override
   public InfluxTSStatus writePoints(InfluxWritePointsReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
@@ -108,7 +112,7 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
 
       try {
         InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
-        InfluxTSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
+        InfluxTSStatus tsStatus = executeNonQueryPlan(plan);
         if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
           executeCode = tsStatus.getCode();
@@ -127,13 +131,13 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
 
   @Override
   public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.sessionId)) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
     try {
       SetStorageGroupPlan setStorageGroupPlan =
           new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
-      return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId());
+      return executeNonQueryPlan(setStorageGroupPlan);
     } catch (IllegalPathException
         | QueryProcessException
         | StorageGroupNotSetException
@@ -156,9 +160,9 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
 
   @Override
   public void handleClientExit() {
-    Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      closeSession(new InfluxCloseSessionReq(sessionId));
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      closeSession(new InfluxCloseSessionReq());
     }
   }
 
@@ -168,10 +172,10 @@ public class InfluxDBServiceImpl implements IInfluxDBServiceWithHandler {
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
 
-  private InfluxTSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId)
+  private InfluxTSStatus executeNonQueryPlan(PhysicalPlan plan)
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     org.apache.iotdb.common.rpc.thrift.TSStatus status =
-        SESSION_MANAGER.checkAuthority(plan, sessionId);
+        SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
     if (status == null) {
       status =
           IoTDB.serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index aca7664144..3258756386 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -90,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -174,7 +176,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
     private final PhysicalPlan plan;
     private final long queryStartTime;
-    private final long sessionId;
+    private final IClientSession session;
     private final String statement;
     private final long statementId;
     private final long timeout;
@@ -190,7 +192,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     public QueryTask(
         PhysicalPlan plan,
         long queryStartTime,
-        long sessionId,
+        IClientSession session,
         String statement,
         long statementId,
         long timeout,
@@ -199,7 +201,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         boolean enableRedirectQuery) {
       this.plan = plan;
       this.queryStartTime = queryStartTime;
-      this.sessionId = sessionId;
+      this.session = session;
       this.statement = statement;
       this.statementId = statementId;
       this.timeout = timeout;
@@ -210,11 +212,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
     @Override
     public TSExecuteStatementResp call() throws Exception {
-      String username = SESSION_MANAGER.getUsername(sessionId);
+      String username = session.getUsername();
       plan.setLoginUserName(username);
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
-      AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
 
       final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
       QueryContext context =
@@ -248,13 +250,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   private class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
-    private final long sessionId;
+    private final IClientSession session;
     private final long queryId;
     private final int fetchSize;
     private final boolean isAlign;
 
-    public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) {
-      this.sessionId = sessionId;
+    public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
+      this.session = session;
       this.queryId = queryId;
       this.fetchSize = fetchSize;
       this.isAlign = isAlign;
@@ -266,8 +268,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try {
         if (isAlign) {
-          TSQueryDataSet result =
-              fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+          TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = result.bufferForTime().limit() != 0;
           if (!hasResultSet) {
             SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
@@ -277,8 +278,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
           resp.setIsAlign(true);
         } else {
           TSQueryNonAlignDataSet nonAlignResult =
-              fillRpcNonAlignReturnData(
-                  fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+              fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = false;
           for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
             if (timeBuffer.limit() != 0) {
@@ -319,8 +319,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
     BasicOpenSessionResp openSessionResp =
-        SESSION_MANAGER.openSession(
-            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+        SESSION_MANAGER.login(
+            SESSION_MANAGER.getCurrSession(),
+            req.username,
+            req.password,
+            req.zoneId,
+            req.client_protocol,
+            clientVersion);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
@@ -337,7 +342,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
-        !SESSION_MANAGER.closeSession(req.sessionId)
+        !SESSION_MANAGER.closeSession(SESSION_MANAGER.getCurrSession())
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -351,14 +356,18 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
     return SESSION_MANAGER.closeOperation(
-        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+        SESSION_MANAGER.getCurrSession(),
+        req.queryId,
+        req.statementId,
+        req.isSetStatementId(),
+        req.isSetQueryId());
   }
 
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSFetchMetadataResp resp = new TSFetchMetadataResp();
 
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return resp.setStatus(getNotLoggedInStatus());
     }
 
@@ -504,7 +513,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     long t1 = System.currentTimeMillis();
     List<TSStatus> result = new ArrayList<>();
     boolean isAllSuccessful = true;
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
@@ -521,8 +530,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
                 .getPlanner()
                 .parseSQLToPhysicalPlan(
                     statement,
-                    SESSION_MANAGER.getZoneId(req.sessionId),
-                    SESSION_MANAGER.getClientVersion(req.sessionId));
+                    SESSION_MANAGER.getSessionTimeZone().toZoneId(),
+                    SESSION_MANAGER.getCurrSession().getClientVersion());
         if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -536,7 +545,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
             index = 0;
           }
 
-          TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status =
+              SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
           if (status != null) {
             insertRowsPlan.getResults().put(index, status);
             isAllSuccessful = false;
@@ -558,7 +568,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
             multiPlan = new CreateMultiTimeSeriesPlan();
             executeList.add(multiPlan);
           }
-          TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status =
+              SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
           if (status != null) {
             multiPlan.getResults().put(i, status);
             isAllSuccessful = false;
@@ -583,7 +594,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
             executeList.clear();
           }
           long t2 = System.currentTimeMillis();
-          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId());
+          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan);
           addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
           result.add(resp.status);
           if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -610,7 +621,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
     String statement = req.getStatement();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -620,8 +631,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   statement,
-                  SESSION_MANAGER.getZoneId(req.getSessionId()),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  SESSION_MANAGER.getCurrSession().getZoneId(),
+                  SESSION_MANAGER.getCurrSession().getClientVersion());
 
       if (physicalPlan.isQuery()) {
         return submitQueryTask(physicalPlan, startTime, req);
@@ -648,7 +659,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -659,8 +670,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  SESSION_MANAGER.getCurrSession().getZoneId(),
+                  SESSION_MANAGER.getCurrSession().getClientVersion());
 
       if (physicalPlan.isQuery()) {
         return submitQueryTask(physicalPlan, startTime, req);
@@ -684,7 +695,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -694,8 +705,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               .getPlanner()
               .rawDataQueryReqToPhysicalPlan(
                   req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  SESSION_MANAGER.getCurrSession().getZoneId(),
+                  SESSION_MANAGER.getCurrSession().getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -704,7 +715,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        SESSION_MANAGER.getCurrSession(),
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -730,7 +741,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
       }
 
@@ -740,8 +751,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               .getPlanner()
               .lastDataQueryReqToPhysicalPlan(
                   req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  SESSION_MANAGER.getCurrSession().getZoneId(),
+                  SESSION_MANAGER.getCurrSession().getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -750,7 +761,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        SESSION_MANAGER.getCurrSession(),
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -775,7 +786,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   private TSExecuteStatementResp submitQueryTask(
       PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
-    TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, req.getSessionId());
+    TSStatus status =
+        SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -783,7 +795,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         new QueryTask(
             physicalPlan,
             startTime,
-            req.sessionId,
+            SESSION_MANAGER.getCurrSession(),
             req.statement,
             req.statementId,
             req.timeout,
@@ -932,7 +944,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       long sessionId)
       throws IoTDBException, TException, SQLException, IOException, InterruptedException,
           QueryFilterOptimizationException {
-    TSStatus status = SESSION_MANAGER.checkAuthority(physicalPlan, sessionId);
+    TSStatus status =
+        SESSION_MANAGER.checkAuthority(physicalPlan, SESSION_MANAGER.getCurrSession());
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -947,7 +960,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
     QUERY_FREQUENCY_RECORDER.incrementAndGet();
     AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+        "Session {} execute select into: {}", SESSION_MANAGER.getCurrSession(), statement);
     if (queryPlan.isEnableTracing()) {
       TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
     }
@@ -965,7 +978,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         if (insertTabletPlans.isEmpty()) {
           continue;
         }
-        TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId);
+        TSStatus executionStatus = insertTabletsInternally(insertTabletPlans);
         if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
           return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId);
@@ -983,12 +996,12 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     }
   }
 
-  private TSStatus insertTabletsInternally(
-      List<InsertTabletPlan> insertTabletPlans, long sessionId) {
+  private TSStatus insertTabletsInternally(List<InsertTabletPlan> insertTabletPlans) {
     InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
     for (int i = 0; i < insertTabletPlans.size(); i++) {
       InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, sessionId);
+      TSStatus status =
+          SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
 
       if (status != null) {
         // not authorized
@@ -1004,7 +1017,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
@@ -1015,7 +1028,9 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
       Future<TSFetchResultsResp> resp =
           QueryTaskManager.getInstance()
-              .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign));
+              .submit(
+                  new FetchResultsTask(
+                      SESSION_MANAGER.getCurrSession(), req.queryId, req.fetchSize, req.isAlign));
       return resp.get();
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -1065,7 +1080,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   /** update statement can be: 1. select-into statement 2. non-query statement */
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
 
@@ -1075,8 +1090,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               .getPlanner()
               .parseSQLToPhysicalPlan(
                   req.statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  SESSION_MANAGER.getCurrSession().getZoneId(),
+                  SESSION_MANAGER.getCurrSession().getClientVersion());
       return physicalPlan.isQuery()
           ? RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1112,11 +1127,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
           QueryFilterOptimizationException {
     return plan.isSelectInto()
         ? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId)
-        : executeNonQueryStatement(plan, sessionId);
+        : executeNonQueryStatement(plan);
   }
 
-  private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+  private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan) {
+    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
     return status != null
         ? new TSExecuteStatementResp(status)
         : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
@@ -1125,9 +1140,9 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public void handleClientExit() {
-    Long sessionId = SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      TSCloseSessionReq req = new TSCloseSessionReq();
       closeSession(req);
     }
     SyncService.getInstance().handleClientExit();
@@ -1136,7 +1151,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1151,7 +1166,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
@@ -1188,14 +1203,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus insertRecords(TSInsertRecordsReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1213,7 +1228,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
                 req.getMeasurementsList().get(i).toArray(new String[0]),
                 req.valuesList.get(i),
                 req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
           allCheckSuccess = false;
@@ -1263,14 +1278,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1286,7 +1301,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               req.getMeasurementsList(),
               req.getValuesList(),
               req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       statusList.add(status != null ? status : executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
       statusList.add(
@@ -1311,14 +1326,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1336,7 +1351,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1371,14 +1386,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1396,7 +1411,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1487,13 +1502,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus insertRecord(TSInsertRecordReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1508,7 +1523,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               req.getMeasurements().toArray(new String[0]),
               req.values,
               req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
@@ -1521,13 +1536,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1542,7 +1557,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
@@ -1555,7 +1570,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1567,7 +1582,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         paths.add(new PartialPath(path));
       }
       plan.addPaths(paths);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
 
       return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
@@ -1582,7 +1597,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertTablet(TSInsertTabletReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1601,7 +1616,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
       insertTabletPlan.setAligned(req.isAligned);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status =
+          SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
 
       return status != null ? status : executeNonQueryPlan(insertTabletPlan);
     } catch (IoTDBException e) {
@@ -1618,7 +1634,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1665,7 +1681,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
     for (int i = 0; i < req.prefixPaths.size(); i++) {
       InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
-      TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status =
+          SESSION_MANAGER.checkAuthority(insertTabletPlan, SESSION_MANAGER.getCurrSession());
       if (status != null) {
         // not authorized
         insertMultiTabletsPlan.getResults().put(i, status);
@@ -1680,12 +1697,12 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1699,7 +1716,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1708,7 +1725,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         storageGroupList.add(new PartialPath(storageGroup));
       }
       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1721,13 +1738,13 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSession(), req.getPath());
       }
 
       // measurementAlias is also a nodeName
@@ -1742,7 +1759,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               req.tags,
               req.attributes,
               req.measurementAlias);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1755,7 +1772,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1768,7 +1785,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getPrefixPath(),
             req.getMeasurements());
       }
@@ -1796,7 +1813,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
               req.measurementAlias,
               req.tagsList,
               req.attributesList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1810,14 +1827,14 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1852,7 +1869,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
       for (int i = 0; i < req.paths.size(); i++) {
         plan.setPath(new PartialPath(req.paths.get(i)));
-        TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+        TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
         if (status != null) {
           // not authorized
           multiPlan.getResults().put(i, status);
@@ -1901,7 +1918,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
     try {
-      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
@@ -1910,7 +1927,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
         pathList.add(new PartialPath(path));
       }
       DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, sessionId);
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1922,20 +1939,20 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(sessionId);
+    return SESSION_MANAGER.requestStatementId(SESSION_MANAGER.getCurrSession());
   }
 
   @Override
   public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
     try {
-      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
         return getNotLoggedInStatus();
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            SESSION_MANAGER.getCurrSession(),
             req.getName());
       }
 
@@ -1945,7 +1962,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       plan = CreateTemplatePlan.deserializeFromReq(buffer);
       // check whether measurement is legal according to syntax convention
       PathUtils.isLegalMeasurementLists(plan.getMeasurements());
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1981,7 +1998,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     AppendTemplatePlan plan =
         new AppendTemplatePlan(
             req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -1989,7 +2006,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
     PruneTemplatePlan plan =
         new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -2043,21 +2060,21 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getTemplateName(),
           req.getPrefixPath());
     }
 
     try {
       SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2066,21 +2083,21 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} unset schema template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getPrefixPath(),
           req.getTemplateName());
     }
 
     try {
       UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
-      TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+      TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2089,19 +2106,19 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
-    if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
       return getNotLoggedInStatus();
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} drop schema template {}.",
-          SESSION_MANAGER.getCurrSessionId(),
+          SESSION_MANAGER.getCurrSession(),
           req.getTemplateName());
     }
 
     DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
-    TSStatus status = SESSION_MANAGER.checkAuthority(plan, req.getSessionId());
+    TSStatus status = SESSION_MANAGER.checkAuthority(plan, SESSION_MANAGER.getCurrSession());
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -2120,6 +2137,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     return SyncService.getInstance().transportFile(metaInfo, buff);
   }
 
+  @Override
+  public TSConnectionInfoResp fetchAllConnectionsInfo() {
+    throw new UnsupportedOperationException();
+  }
+
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     try {
       return serviceProvider.executeNonQuery(plan)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 036881d709..fb88642e84 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -160,4 +160,8 @@ public class TElasticFramedTransport extends TTransport {
   public void write(byte[] buf, int off, int len) {
     writeBuffer.write(buf, off, len);
   }
+
+  public TTransport getSocket() {
+    return underlying;
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 17f05e3f9f..3c5f54be2b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -28,7 +28,7 @@ import java.net.SocketException;
 public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport
     implements TimeoutChangeableTransport {
 
-  private TSocket underlyingSocket;
+  private final TSocket underlyingSocket;
 
   public TimeoutChangeableTFastFramedTransport(
       TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
@@ -46,6 +46,12 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
     return underlyingSocket.getSocket().getSoTimeout();
   }
 
+  @Override
+  public TTransport getSocket() {
+    // in fact, this should be the same with underlying...
+    return underlyingSocket;
+  }
+
   public static class Factory extends TTransportFactory {
 
     private final int thriftDefaultBufferSize;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index c7d89757f5..590005b8b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -28,7 +28,7 @@ import java.net.SocketException;
 public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFramedTransport
     implements TimeoutChangeableTransport {
 
-  private TSocket underlyingSocket;
+  private final TSocket underlyingSocket;
 
   public TimeoutChangeableTSnappyFramedTransport(
       TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
@@ -46,6 +46,12 @@ public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFrame
     return underlyingSocket.getSocket().getSoTimeout();
   }
 
+  @Override
+  public TTransport getSocket() {
+    // in fact, this should be the same with underlying...
+    return underlyingSocket;
+  }
+
   public static class Factory extends TTransportFactory {
 
     private final int thriftDefaultBufferSize;
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 427ced3466..540bad00cf 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.session;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.session.template.Template;
 import org.apache.iotdb.session.util.Version;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -435,4 +436,6 @@ public interface ISession extends AutoCloseable {
   void setEnableRedirection(boolean enableRedirection);
 
   void sortTablet(Tablet tablet);
+
+  TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5252acbc56..590784ca76 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.rpc.NoValidValueException;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -3249,6 +3250,11 @@ public class Session implements ISession {
     this.enableRedirection = enableRedirection;
   }
 
+  @Override
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+    return defaultSessionConnection.fetchAllConnections();
+  }
+
   public static class Builder {
     private String host = SessionConfig.DEFAULT_HOST;
     private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 43352fdc32..8a2d6d2e4a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -953,6 +954,22 @@ public class SessionConnection {
     }
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+    try {
+      return client.fetchAllConnectionsInfo();
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          return client.fetchAllConnectionsInfo();
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+  }
+
   public boolean isEnableRedirect() {
     return enableRedirect;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 60e94f76df..6cc47737c1 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session.pool;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionConfig;
 import org.apache.iotdb.session.SessionDataSet;
@@ -2305,6 +2306,26 @@ public class SessionPool {
     return connectionTimeoutInMs;
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        TSConnectionInfoResp resp = session.fetchAllConnections();
+        putBack(session);
+        return resp;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("fetchAllConnections failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (Throwable t) {
+        putBack(session);
+        throw t;
+      }
+    }
+    return null;
+  }
+
   public static class Builder {
 
     private String host = SessionConfig.DEFAULT_HOST;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 83d15737e1..1c8b1e048e 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -429,6 +429,23 @@ struct TSyncTransportMetaInfo{
   2:required i64 startIndex
 }
 
+enum TSConnectionType {
+  THRIFT_BASED
+  MQTT_BASED
+  INTERNAL
+}
+
+struct TSConnectionInfo {
+  1: required string userName
+  2: required i64 logInTime
+  3: required string connectionId // ip:port for thrift-based service and clientId for mqtt-based service
+  4: required TSConnectionType type
+}
+
+struct TSConnectionInfoResp {
+  1: required list<TSConnectionInfo> connectionInfoList
+}
+
 service IClientRPCService {
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
 
@@ -525,4 +542,6 @@ service IClientRPCService {
   common.TSStatus sendPipeData(1:binary buff);
 
   common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
+
+  TSConnectionInfoResp fetchAllConnectionsInfo();
 }
\ No newline at end of file