You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2022/10/23 15:45:31 UTC

[iotdb] 02/02: use threadLocal to replace session id

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32178b0e5e4560643fa1bf4718b911b9a04e94b8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sun Oct 23 23:45:10 2022 +0800

    use threadLocal<ClientSession> to replace session id
---
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |   4 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   9 +-
 external-api/pom.xml                               |   8 +
 .../external/api/thrift/JudgableServerContext.java |  41 +++
 .../external/api/thrift/ServerContextFactory.java  |  23 ++
 .../iotdb/db/protocol/mqtt/PublishHandler.java     |  35 ++-
 .../iotdb/db/qp/physical/crud/GroupByTimePlan.java |   2 +-
 .../apache/iotdb/db/qp/utils/DateTimeUtils.java    |   2 +-
 .../iotdb/db/query/control/SessionManager.java     | 153 +++++----
 .../db/query/control/SessionTimeoutManager.java    |  33 +-
 .../query/control/clientsession/ClientSession.java |  41 +++
 .../control/clientsession/IClientSession.java      | 101 ++++++
 .../control/clientsession/MqttClientSession.java   |  46 +++
 .../dataset/groupby/GroupByEngineDataSet.java      |   2 +-
 .../apache/iotdb/db/query/executor/fill/IFill.java |   2 +-
 .../iotdb/db/service/basic/ServiceProvider.java    |  75 +++--
 .../thrift/TThreadPoolServerWithContext.java       | 150 +++++++++
 .../db/service/thrift/ThriftServiceThread.java     |   3 +-
 .../thrift/handler/RPCServiceThriftHandler.java    |  67 +++-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  42 ++-
 .../db/service/thrift/impl/TSServiceImpl.java      | 341 +++++++++++----------
 22 files changed, 862 insertions(+), 320 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 1cbed9d0af..9de6b5608d 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -578,7 +578,7 @@ public abstract class AbstractCli {
    * @param columnCount the number of column
    * @param resultSetMetaData jdbc resultSetMetaData
    * @param zoneId your time zone
-   * @return List<List<String>> result
+   * @return {@literal List<List<String>> result}
    * @throws SQLException throw exception
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index f0495c43f8..c966382104 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -641,8 +641,8 @@ public class ImportCsv extends AbstractCsvTool {
    * read data from the CSV file
    *
    * @param path
-   * @return
-   * @throws IOException
+   * @return CSVParser csv parser
+   * @throws IOException when reading the csv file failed.
    */
   private static CSVParser readCsvFile(String path) throws IOException {
     return CSVFormat.Builder.create(CSVFormat.DEFAULT)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index f7c5e95854..68e4c08a2e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.cluster.server;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics;
 import org.apache.iotdb.db.service.thrift.ThriftService;
@@ -79,7 +81,12 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
               config.getThriftServerAwaitTimeForStopService(),
-              new RPCServiceThriftHandler(impl),
+              new RPCServiceThriftHandler(impl) {
+                @Override
+                protected SessionManager getSessionManager() {
+                  return ClusterSessionManager.getInstance();
+                }
+              },
               IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
diff --git a/external-api/pom.xml b/external-api/pom.xml
index 086c6a08b4..28ec1eed74 100644
--- a/external-api/pom.xml
+++ b/external-api/pom.xml
@@ -28,6 +28,14 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>external-api</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${thrift.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
     <profiles>
         <profile>
             <id>get-jar-with-dependencies</id>
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
new file mode 100644
index 0000000000..5948804a57
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.server.ServerContext;
+
+public interface JudgableServerContext extends ServerContext {
+
+  /**
+   * this method will be called when a client connects to the IoTDB server.
+   *
+   * @return true / false
+   */
+  public boolean authorised();
+
+  @Override
+  default <T> T unwrap(Class<T> iface) {
+    return null;
+  }
+
+  @Override
+  default boolean isWrapperFor(Class<?> iface) {
+    return false;
+  };
+}
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
new file mode 100644
index 0000000000..b6788b376a
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+public interface ServerContextFactory {
+  JudgableServerContext newServerContext();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 892e17cde5..8c6a1d9ad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -46,7 +46,8 @@ import java.util.concurrent.ConcurrentHashMap;
 public class PublishHandler extends AbstractInterceptHandler {
 
   private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
-  private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+      new ConcurrentHashMap<>();
   private static final boolean isEnableOperationSync =
       IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
   private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
@@ -68,15 +69,17 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onConnect(InterceptConnectMessage msg) {
-    if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
       try {
-        BasicOpenSessionResp basicOpenSessionResp =
-            serviceProvider.openSession(
-                msg.getUsername(),
-                new String(msg.getPassword()),
-                ZoneId.systemDefault().toString(),
-                TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
-        clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+        MqttClientSession session = new MqttClientSession(msg.getClientID());
+        // TODO should we put this session into a ThreadLocal in SessionManager?
+        serviceProvider.login(
+            session,
+            msg.getUsername(),
+            new String(msg.getPassword()),
+            ZoneId.systemDefault().toString(),
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        clientIdToSessionMap.put(msg.getClientID(), session);
       } catch (TException e) {
         throw new RuntimeException(e);
       }
@@ -85,19 +88,19 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onDisconnect(InterceptDisconnectMessage msg) {
-    Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
-    if (null != sessionId) {
-      serviceProvider.closeSession(sessionId);
+    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+    if (null != session) {
+      serviceProvider.closeSession(session);
     }
   }
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
     String clientId = msg.getClientID();
-    if (!clientIdToSessionIdMap.containsKey(clientId)) {
+    if (!clientIdToSessionMap.containsKey(clientId)) {
       return;
     }
-    long sessionId = clientIdToSessionIdMap.get(clientId);
+    MqttClientSession session = clientIdToSessionMap.get(clientId);
     ByteBuf payload = msg.getPayload();
     String topic = msg.getTopicName();
     String username = msg.getUsername();
@@ -132,7 +135,7 @@ public class PublishHandler extends AbstractInterceptHandler {
                 event.getTimestamp(),
                 event.getMeasurements().toArray(new String[0]),
                 event.getValues().toArray(new String[0]));
-        TSStatus tsStatus = serviceProvider.checkAuthority(plan, sessionId);
+        TSStatus tsStatus = serviceProvider.checkAuthority(plan, session);
         if (tsStatus != null) {
           LOG.warn(tsStatus.message);
         } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index 1fab54b56c..bfb8785eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan {
               plan.getEndTime(),
               plan.isSlidingStepByMonth(),
               plan.isIntervalByMonth(),
-              SessionManager.getInstance().getCurrSessionTimeZone())));
+              SessionManager.getInstance().getSessionTimeZone())));
     } else {
       return new GlobalTimeExpression(
           new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
index b6901726bd..eb25720a2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
@@ -584,7 +584,7 @@ public class DateTimeUtils {
           res *= 30 * 86_400_000L;
         } else {
           Calendar calendar = Calendar.getInstance();
-          calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+          calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
           calendar.setTimeInMillis(currentTime);
           calendar.add(Calendar.MONTH, (int) (value));
           res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 7e1861eeed..061f635d81 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
@@ -34,69 +35,110 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * Session Manager is for mananging active sessions. It will be used by both Thrift based services
+ * (i.e., TSServiceImpl and InfluxdbService) and Mqtt based service. <br>
+ * Thrift based services are client-thread model, i.e., each client has a thread. So we can use
+ * threadLocal for such services.<br>
+ * However, Mqtt based service use message-thread model, i.e, each message has a short thread. So,
+ * we can not use threadLocal for such services.
+ */
 public class SessionManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
 
   // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+  /** currSession can be only used in client-thread model services. */
+  private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
+
+  // we keep this sessionIdGenerator just for keep Compatible with v0.13
+  @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
 
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
   // The statementId is unique in one IoTDB instance.
   private final AtomicLong statementIdGenerator = new AtomicLong();
 
   // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+  private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>();
   // (statementId -> Set(queryId))
   private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  // (sessionId -> client version number)
-  private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
-      new ConcurrentHashMap<>();
-
   protected SessionManager() {
     // singleton
   }
 
-  public Long getCurrSessionId() {
-    return currSessionId.get();
-  }
-
-  public void removeCurrSessionId() {
-    currSessionId.remove();
+  /**
+   * this method can be only used in client-thread model.
+   *
+   * @return
+   */
+  public IClientSession getCurrSession() {
+    return currSession.get();
   }
 
-  public TimeZone getCurrSessionTimeZone() {
-    if (getCurrSessionId() != null) {
-      return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+  public TimeZone getSessionTimeZone() {
+    IClientSession session = currSession.get();
+    if (session != null) {
+      return session.getTimeZone();
     } else {
       // only used for test
       return TimeZone.getTimeZone("+08:00");
     }
   }
 
-  public long requestSessionId(
-      String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
-    long sessionId = sessionIdGenerator.incrementAndGet();
-
-    currSessionId.set(sessionId);
-    sessionIdToUsername.put(sessionId, username);
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
-    sessionIdToClientVersion.put(sessionId, clientVersion);
+  /**
+   * this method can be only used in client-thread model. But, in message-thread model based
+   * service, calling this method has no side effect. <br>
+   * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
+   * method in thrift's event handler.
+   *
+   * @return
+   */
+  public void removeCurrSession() {
+    currSession.remove();
+  }
 
-    return sessionId;
+  /**
+   * this method can be only used in client-thread model. Do not use this method in message-thread
+   * model based service.
+   *
+   * @param session
+   * @return false if the session has been initialized.
+   */
+  public boolean registerSession(IClientSession session) {
+    if (this.currSession.get() != null) {
+      LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
+      return false;
+    }
+    this.currSession.set(session);
+    return true;
   }
 
-  public boolean releaseSessionResource(long sessionId) {
-    sessionIdToZoneId.remove(sessionId);
-    sessionIdToClientVersion.remove(sessionId);
+  /**
+   * must be called after registerSession()) will mark the session login.
+   *
+   * @param username
+   * @param zoneId
+   * @param clientVersion
+   */
+  public void supplySession(
+      IClientSession session,
+      String username,
+      String zoneId,
+      IoTDBConstant.ClientVersion clientVersion) {
+    session.setId(sessionIdGenerator.incrementAndGet());
+    session.setUsername(username);
+    session.setZoneId(ZoneId.of(zoneId));
+    session.setClientVersion(clientVersion);
+    session.setLogin(true);
+  }
 
-    Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
+  /**
+   * @param session
+   * @return true if releasing successfully, false otherwise (e.g., the session does not exist)
+   */
+  public boolean releaseSessionResource(IClientSession session) {
+    Set<Long> statementIdSet = sessionToStatementId.remove(session);
     if (statementIdSet != null) {
       for (Long statementId : statementIdSet) {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -106,34 +148,41 @@ public class SessionManager {
           }
         }
       }
+      return true;
     }
-
-    return sessionIdToUsername.remove(sessionId) != null;
+    // TODO if there is no statement for the session, how to return (true or false?)
+    return false;
   }
 
-  public long getSessionIdByQueryId(long queryId) {
+  /**
+   * @param queryId
+   * @return null if not found. (e.g., the client session is closed already. TODO: do we really have
+   *     this case?)
+   */
+  public IClientSession getSessionIdByQueryId(long queryId) {
     // TODO: make this more efficient with a queryId -> sessionId map
     for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
       if (statementToQueries.getValue().contains(queryId)) {
-        for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
+        for (Map.Entry<IClientSession, Set<Long>> sessionToStatements :
+            sessionToStatementId.entrySet()) {
           if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
             return sessionToStatements.getKey();
           }
         }
       }
     }
-    return -1;
+    return null;
   }
 
-  public long requestStatementId(long sessionId) {
+  public long requestStatementId(IClientSession session) {
     long statementId = statementIdGenerator.incrementAndGet();
-    sessionIdToStatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+    sessionToStatementId
+        .computeIfAbsent(session, s -> new CopyOnWriteArraySet<>())
         .add(statementId);
     return statementId;
   }
 
-  public void closeStatement(long sessionId, long statementId) {
+  public void closeStatement(IClientSession session, long statementId) {
     Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
     if (queryIdSet != null) {
       for (Long queryId : queryIdSet) {
@@ -141,8 +190,8 @@ public class SessionManager {
       }
     }
 
-    if (sessionIdToStatementId.containsKey(sessionId)) {
-      sessionIdToStatementId.get(sessionId).remove(statementId);
+    if (sessionToStatementId.containsKey(session)) {
+      sessionToStatementId.get(session).remove(statementId);
     }
   }
 
@@ -176,18 +225,6 @@ public class SessionManager {
     }
   }
 
-  public String getUsername(Long sessionId) {
-    return sessionIdToUsername.get(sessionId);
-  }
-
-  public ZoneId getZoneId(Long sessionId) {
-    return sessionIdToZoneId.get(sessionId);
-  }
-
-  public void setTimezone(Long sessionId, String zone) {
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
-  }
-
   public boolean hasDataset(Long queryId) {
     return queryIdToDataSet.containsKey(queryId);
   }
@@ -211,10 +248,6 @@ public class SessionManager {
     }
   }
 
-  public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) {
-    return sessionIdToClientVersion.get(sessionId);
-  }
-
   public static SessionManager getInstance() {
     return SessionManagerHelper.INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 1f179c5e40..c9ff8efb02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 
 import org.slf4j.Logger;
@@ -37,7 +38,7 @@ public class SessionTimeoutManager {
   private static final long SESSION_TIMEOUT =
       IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
 
-  private Map<Long, Long> sessionIdToLastActiveTime;
+  private Map<IClientSession, Long> sessionToLastActiveTime;
   private ScheduledExecutorService executorService;
 
   private SessionTimeoutManager() {
@@ -45,7 +46,7 @@ public class SessionTimeoutManager {
       return;
     }
 
-    this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
+    this.sessionToLastActiveTime = new ConcurrentHashMap<>();
     this.executorService =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
 
@@ -59,37 +60,43 @@ public class SessionTimeoutManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void register(long sessionId) {
+  public void register(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
+    sessionToLastActiveTime.put(session, System.currentTimeMillis());
   }
 
-  public boolean unregister(long sessionId) {
+  /**
+   * unregister the session and release all its query resources.
+   *
+   * @param session
+   * @return true if removing successfully, false otherwise (e.g., the session does not exist)
+   */
+  public boolean unregister(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
-      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId);
+      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session);
     }
 
-    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) {
-      return sessionIdToLastActiveTime.remove(sessionId) != null;
+    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) {
+      return sessionToLastActiveTime.remove(session) != null;
     }
 
     return false;
   }
 
-  public void refresh(long sessionId) {
+  public void refresh(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
+    sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis());
   }
 
   private void cleanup() {
     long currentTime = System.currentTimeMillis();
-    sessionIdToLastActiveTime.entrySet().stream()
+    sessionToLastActiveTime.entrySet().stream()
         .filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
         .forEach(
             entry -> {
@@ -106,11 +113,11 @@ public class SessionTimeoutManager {
     return SessionTimeoutManagerHelper.INSTANCE;
   }
 
-  public boolean isSessionAlive(long sessionId) {
+  public boolean isSessionAlive(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return true;
     }
-    return sessionIdToLastActiveTime.containsKey(sessionId);
+    return sessionToLastActiveTime.containsKey(session);
   }
 
   private static class SessionTimeoutManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
new file mode 100644
index 0000000000..9edcc6f077
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import java.net.InetSocketAddress;
+
+/** Client Session is the only identity for a connection. */
+public class ClientSession extends IClientSession {
+
+  InetSocketAddress clientNet;
+
+  public ClientSession(InetSocketAddress clientNet) {
+    this.clientNet = clientNet;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientNet.getHostName();
+  }
+
+  @Override
+  int getClientPort() {
+    return clientNet.getPort();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
new file mode 100644
index 0000000000..b3ae42e68b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
+
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+public abstract class IClientSession {
+
+  /** id is just used for keep compatible with v0.13 */
+  @Deprecated long id;
+
+  ClientVersion clientVersion;
+
+  ZoneId zoneId;
+
+  // TODO: why some Statement Plans use timeZone while others use ZoneId?
+  TimeZone timeZone;
+
+  String username;
+
+  boolean login = false;
+
+  abstract String getClientAddress();
+
+  abstract int getClientPort();
+
+  public void setClientVersion(ClientVersion clientVersion) {
+    this.clientVersion = clientVersion;
+  }
+
+  public ClientVersion getClientVersion() {
+    return this.clientVersion;
+  }
+
+  public ZoneId getZoneId() {
+    return this.zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+    this.timeZone = TimeZone.getTimeZone(zoneId);
+  }
+
+  public TimeZone getTimeZone() {
+    return timeZone;
+  }
+
+  public void setTimeZone(TimeZone timeZone) {
+    this.timeZone = timeZone;
+    this.zoneId = timeZone.toZoneId();
+  }
+
+  public String getUsername() {
+    return this.username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public boolean isLogin() {
+    return login;
+  }
+
+  public void setLogin(boolean login) {
+    this.login = login;
+  }
+
+  @Deprecated
+  public long getId() {
+    return id;
+  }
+
+  @Deprecated
+  public void setId(long id) {
+    this.id = id;
+  }
+
+  public String toString() {
+    return String.format("%d-%s:%d", getId(), getClientAddress(), getClientPort());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
new file mode 100644
index 0000000000..b458cb22f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+public class MqttClientSession extends IClientSession {
+
+  String clientID;
+
+  public MqttClientSession(String clientID) {
+    this.clientID = clientID;
+  }
+
+  public String getClientID() {
+    return clientID;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientID;
+  }
+
+  @Override
+  public int getClientPort() {
+    return 0;
+  }
+
+  public String toString() {
+    return String.format("%d-%s", getId(), getClientID());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index ccd374b9b3..ed54e5920a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -209,7 +209,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
    */
   public static long calcIntervalByMonth(long startTime, long numMonths) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     boolean isLastDayOfMonth =
         calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 5fbb0575dd..0ccfb23e15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -143,7 +143,7 @@ public abstract class IFill {
 
   protected long slideMonth(long startTime, int monthNum) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     calendar.add(Calendar.MONTH, monthNum);
     return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 0806dec7a7..0987d2d97f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.tracing.TracingManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -58,6 +59,10 @@ import java.sql.SQLException;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 
+/**
+ * There is only one ServiceProvider instance for each IoTDB instance. Both client-thread model
+ * based services and message-thread model based services (e.g., mqtt) are using this service.
+ */
 public abstract class ServiceProvider {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
@@ -105,14 +110,13 @@ public abstract class ServiceProvider {
    *
    * @return true: If logged in; false: If not logged in
    */
-  public boolean checkLogin(long sessionId) {
-    Long currSessionId = SESSION_MANAGER.getCurrSessionId();
-    boolean isLoggedIn = currSessionId != null && currSessionId == sessionId;
+  public boolean checkLogin(IClientSession session) {
+    boolean isLoggedIn = session != null && session.isLogin();
     if (!isLoggedIn) {
       LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
       return false;
     } else {
-      SessionTimeoutManager.getInstance().refresh(sessionId);
+      SessionTimeoutManager.getInstance().refresh(session);
     }
     return isLoggedIn;
   }
@@ -120,11 +124,11 @@ public abstract class ServiceProvider {
   /**
    * Check whether current session is timeout.
    *
-   * @param sessionId Session id.
+   * @param session clientSession.
    * @return true: If session timeout; false: If not session timeout.
    */
-  public boolean checkSessionTimeout(long sessionId) {
-    if (!SessionTimeoutManager.getInstance().isSessionAlive(sessionId)) {
+  public boolean checkSessionTimeout(IClientSession session) {
+    if (!SessionTimeoutManager.getInstance().isSessionAlive(session)) {
       return true;
     }
     return false;
@@ -143,9 +147,9 @@ public abstract class ServiceProvider {
         username, plan.getAuthPaths(), plan.getOperatorType(), targetUser);
   }
 
-  public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+  public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
     try {
-      if (!checkAuthorization(plan, SESSION_MANAGER.getUsername(sessionId))) {
+      if (!checkAuthorization(plan, session.getUsername())) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation, please add privilege "
@@ -162,7 +166,8 @@ public abstract class ServiceProvider {
     return null;
   }
 
-  public BasicOpenSessionResp openSession(
+  public BasicOpenSessionResp login(
+      IClientSession session,
       String username,
       String password,
       String zoneId,
@@ -187,7 +192,6 @@ public abstract class ServiceProvider {
       loginMessage = e.getMessage();
     }
 
-    long sessionId = -1;
     if (status) {
       // check the version compatibility
       boolean compatible = checkCompatibility(tsProtocolVersion);
@@ -195,74 +199,69 @@ public abstract class ServiceProvider {
         openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
         openSessionResp.setMessage(
             "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
-        return openSessionResp.sessionId(sessionId);
+        return openSessionResp.sessionId(-1);
       }
 
       openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       openSessionResp.setMessage("Login successfully");
 
-      sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
-
+      SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion);
       LOGGER.info(
           "{}: Login status: {}. User : {}, opens Session-{}",
           IoTDBConstant.GLOBAL_DB_NAME,
           openSessionResp.getMessage(),
           username,
-          sessionId);
+          session);
     } else {
       openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
       openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
-
-      sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
       AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+      // TODO we should close this connection ASAP, otherwise there will be DDoS.
     }
-
-    SessionTimeoutManager.getInstance().register(sessionId);
-    return openSessionResp.sessionId(sessionId);
+    SessionTimeoutManager.getInstance().register(session);
+    return openSessionResp.sessionId(session == null ? -1 : session.getId());
   }
 
-  public BasicOpenSessionResp openSession(
-      String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+  public BasicOpenSessionResp login(
+      IClientSession session,
+      String username,
+      String password,
+      String zoneId,
+      TSProtocolVersion tsProtocolVersion)
       throws TException {
-    return openSession(
-        username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+    return login(
+        session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
   }
 
-  public boolean closeSession(long sessionId) {
-    AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-
-    SESSION_MANAGER.removeCurrSessionId();
-
-    return SessionTimeoutManager.getInstance().unregister(sessionId);
+  public boolean closeSession(IClientSession session) {
+    AUDIT_LOGGER.info("Session-{} is closing", session);
+    return SessionTimeoutManager.getInstance().unregister(session);
   }
 
   public TSStatus closeOperation(
-      long sessionId,
+      IClientSession session,
       long queryId,
       long statementId,
       boolean haveStatementId,
       boolean haveSetQueryId) {
-    if (!checkLogin(sessionId)) {
+    if (!checkLogin(session)) {
       return RpcUtils.getStatus(
           TSStatusCode.NOT_LOGIN_ERROR,
           "Log in failed. Either you are not authorized or the session has timed out.");
     }
-    if (checkSessionTimeout(sessionId)) {
+    if (checkSessionTimeout(session)) {
       return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "{}: receive close operation from Session {}",
-          IoTDBConstant.GLOBAL_DB_NAME,
-          SESSION_MANAGER.getCurrSessionId());
+          "{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, session);
     }
-
     try {
       if (haveStatementId) {
         if (haveSetQueryId) {
           SESSION_MANAGER.closeDataset(statementId, queryId);
         } else {
-          SESSION_MANAGER.closeStatement(sessionId, statementId);
+          SESSION_MANAGER.closeStatement(session, statementId);
         }
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
new file mode 100644
index 0000000000..8d36047509
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
@@ -0,0 +1,150 @@
+package org.apache.iotdb.db.service.thrift;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+
+public class TThreadPoolServerWithContext extends TThreadPoolServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServerWithContext.class);
+
+  public TThreadPoolServerWithContext(Args args) {
+    super(args);
+  }
+
+  @Override
+  protected void execute() {
+    while (!stopped_) {
+      try {
+        TTransport client = serverTransport_.accept();
+        try {
+          getExecutorService().execute(new WorkerProcess(client));
+        } catch (RejectedExecutionException ree) {
+          if (!stopped_) {
+            LOGGER.warn(
+                "ThreadPool is saturated with incoming requests. Closing latest connection.");
+          }
+          client.close();
+        }
+      } catch (TTransportException ttx) {
+        if (!stopped_) {
+          LOGGER.warn("Transport error occurred during acceptance of message", ttx);
+        }
+      }
+    }
+  }
+
+  // The following codes are copied from TThreadPoolServer.WorkerProcess
+  // and we add additional processing for connectionContext
+
+  private class WorkerProcess implements Runnable {
+
+    /** Client that this services. */
+    private TTransport client_;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private WorkerProcess(TTransport client) {
+      client_ = client;
+    }
+
+    /** Loops on processing a client forever */
+    public void run() {
+      TProcessor processor = null;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+
+      Optional<TServerEventHandler> eventHandler = Optional.empty();
+      ServerContext connectionContext = null;
+
+      try {
+        processor = processorFactory_.getProcessor(client_);
+        inputTransport = inputTransportFactory_.getTransport(client_);
+        outputTransport = outputTransportFactory_.getTransport(client_);
+        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+
+        eventHandler = Optional.ofNullable(getEventHandler());
+
+        if (eventHandler.isPresent()) {
+          connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
+          LOGGER.error("测试一下效果。、。。。。");
+        }
+
+        while (true) {
+          if (Thread.currentThread().isInterrupted()) {
+            LOGGER.debug("WorkerProcess requested to shutdown");
+            break;
+          }
+          if (eventHandler.isPresent()) {
+            eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
+          }
+          // This process cannot be interrupted by Interrupting the Thread. This
+          // will return once a message has been processed or the socket timeout
+          // has elapsed, at which point it will return and check the interrupt
+          // state of the thread.
+          processor.process(inputProtocol, outputProtocol);
+        }
+      } catch (Exception x) {
+        LOGGER.debug("Error processing request", x);
+
+        // We'll usually receive RuntimeException types here
+        // Need to unwrap to ascertain real causing exception before we choose to ignore
+        // Ignore err-logging all transport-level/type exceptions
+        if (!isIgnorableException(x)) {
+          // Log the exception at error level and continue
+          LOGGER.error(
+              (x instanceof TException ? "Thrift " : "")
+                  + "Error occurred during processing of message.",
+              x);
+        }
+      } finally {
+        if (eventHandler.isPresent()) {
+          eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
+        }
+        if (inputTransport != null) {
+          inputTransport.close();
+        }
+        if (outputTransport != null) {
+          outputTransport.close();
+        }
+        if (client_.isOpen()) {
+          client_.close();
+        }
+      }
+    }
+
+    private boolean isIgnorableException(Exception x) {
+      TTransportException tTransportException = null;
+
+      if (x instanceof TTransportException) {
+        tTransportException = (TTransportException) x;
+      } else if (x.getCause() instanceof TTransportException) {
+        tTransportException = (TTransportException) x.getCause();
+      }
+
+      if (tTransportException != null) {
+        switch (tTransportException.getType()) {
+          case TTransportException.END_OF_FILE:
+          case TTransportException.TIMED_OUT:
+            return true;
+        }
+      }
+      return false;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 53cf755ad0..edf98d1774 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -154,7 +154,8 @@ public class ThriftServiceThread extends Thread {
       serverTransport = openTransport(bindAddress, port);
       TThreadPoolServer.Args poolArgs =
           initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
-      poolServer = new TThreadPoolServer(poolArgs);
+      poolServer = new TThreadPoolServerWithContext(poolArgs);
+      logger.warn("注册EventHandler");
       poolServer.setServerEventHandler(serverEventHandler);
     } catch (TTransportException e) {
       catchFailedInitialization(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 6761c0d1c9..e7e8c3ce4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,37 +16,92 @@
  */
 package org.apache.iotdb.db.service.thrift.handler;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
 
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class RPCServiceThriftHandler implements TServerEventHandler {
+  private static final Logger logger = LoggerFactory.getLogger(RPCServiceThriftHandler.class);
   private TSServiceImpl serviceImpl;
   private AtomicLong thriftConnectionNumber = new AtomicLong(0);
 
+  private ServerContextFactory factory = null;
+
   public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
     this.serviceImpl = serviceImpl;
     MetricService.getInstance()
         .addMetricSet(new RPCServiceThriftHandlerMetrics(thriftConnectionNumber));
+    String factoryClass =
+        IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getCustomizedProperties()
+            .getProperty("rpc_service_thrift_handler_context_class");
+    if (factoryClass != null) {
+      try {
+        factory = (ServerContextFactory) Class.forName(factoryClass).newInstance();
+      } catch (Exception e) {
+        logger.warn(
+            "configuration announced ServerContextFactory {}, but it is not found in classpath",
+            factoryClass);
+        factory = null;
+      }
+    }
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+  public ServerContext createContext(TProtocol in, TProtocol out) {
+    logger.info("创建了连接");
     thriftConnectionNumber.incrementAndGet();
+    Socket socket =
+        ((TSocket) ((TElasticFramedTransport) in.getTransport()).getSocket()).getSocket();
+    logger.info(
+        "in local: {}:{}, remote: {}, default: {}",
+        socket.getLocalSocketAddress(),
+        socket.getLocalPort(),
+        socket.getRemoteSocketAddress(),
+        socket.getPort());
+    socket = ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+    logger.info(
+        "out local: {}:{}, remote: {}, default: {}",
+        socket.getLocalSocketAddress(),
+        socket.getLocalPort(),
+        socket.getRemoteSocketAddress(),
+        socket.getPort());
+    getSessionManager()
+        .registerSession(new ClientSession((InetSocketAddress) socket.getRemoteSocketAddress()));
+    if (factory != null) {
+      JudgableServerContext context = factory.newServerContext();
+      // TODO
+      return context;
+    }
+
     return null;
   }
 
   @Override
   public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+    logger.info("移除了连接");
     // release query resources.
     serviceImpl.handleClientExit();
     thriftConnectionNumber.decrementAndGet();
+    getSessionManager().removeCurrSession();
   }
 
   @Override
@@ -58,4 +113,14 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
   public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
     // nothing
   }
+
+  /**
+   * get the SessionManager Instance. <br>
+   * in v0.13, Cluster mode uses different SessionManager instance...
+   *
+   * @return
+   */
+  protected SessionManager getSessionManager() {
+    return SessionManager.getInstance();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index 7febf56420..e2467d0d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
@@ -59,6 +61,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   private final InfluxDBMetaManager metaManager;
 
+  private SessionManager sessionManager = SessionManager.getInstance();
+
   public InfluxDBServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
     metaManager = InfluxDBMetaManager.getInstance();
@@ -66,9 +70,14 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+    IClientSession session = sessionManager.getCurrSession();
     BasicOpenSessionResp basicOpenSessionResp =
-        serviceProvider.openSession(
-            req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        serviceProvider.login(
+            session,
+            req.username,
+            req.password,
+            req.zoneId,
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
     return new TSOpenSessionResp()
         .setStatus(
             RpcUtils.getInfluxDBStatus(
@@ -78,15 +87,17 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
+    IClientSession session = sessionManager.getCurrSession();
     return new TSStatus(
-        !serviceProvider.closeSession(req.sessionId)
+        !serviceProvider.closeSession(session)
             ? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
   @Override
   public TSStatus writePoints(TSWritePointsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.sessionId);
+    IClientSession session = sessionManager.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
@@ -97,7 +108,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
       IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager);
       try {
         InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
-        TSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
+        TSStatus tsStatus = executeNonQueryPlan(session, plan, req.sessionId);
         if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
           executeCode = tsStatus.getCode();
@@ -118,11 +129,11 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
     return tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
 
-  private TSStatus checkLoginStatus(long sessionId) {
-    if (!serviceProvider.checkLogin(sessionId)) {
+  private TSStatus checkLoginStatus(IClientSession session) {
+    if (!serviceProvider.checkLogin(session)) {
       return getNotLoggedInStatus();
     }
-    if (serviceProvider.checkSessionTimeout(sessionId)) {
+    if (serviceProvider.checkSessionTimeout(session)) {
       return RpcUtils.getInfluxDBStatus(
           TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
     }
@@ -131,14 +142,15 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSStatus createDatabase(TSCreateDatabaseReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.sessionId);
+    IClientSession session = sessionManager.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     try {
       SetStorageGroupPlan setStorageGroupPlan =
           new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
-      return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId());
+      return executeNonQueryPlan(session, setStorageGroupPlan, req.getSessionId());
     } catch (IllegalPathException
         | QueryProcessException
         | StorageGroupNotSetException
@@ -152,9 +164,9 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
   }
 
   public void handleClientExit() {
-    Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      closeSession(new TSCloseSessionReq(sessionId));
+    IClientSession session = sessionManager.getCurrSession();
+    if (session != null) {
+      closeSession(new TSCloseSessionReq(session.getId()));
     }
   }
 
@@ -164,10 +176,10 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
 
-  private TSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId)
+  private TSStatus executeNonQueryPlan(IClientSession session, PhysicalPlan plan, long sessionId)
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     org.apache.iotdb.service.rpc.thrift.TSStatus status =
-        serviceProvider.checkAuthority(plan, sessionId);
+        serviceProvider.checkAuthority(plan, session);
     if (status == null) {
       status =
           serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7e862e532f..5f0793562f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -178,7 +179,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
     private PhysicalPlan plan;
     private final long queryStartTime;
-    private final long sessionId;
+    private final IClientSession session;
     private final String statement;
     private final long statementId;
     private final long timeout;
@@ -194,7 +195,7 @@ public class TSServiceImpl implements TSIService.Iface {
     public QueryTask(
         PhysicalPlan plan,
         long queryStartTime,
-        long sessionId,
+        IClientSession session,
         String statement,
         long statementId,
         long timeout,
@@ -203,7 +204,7 @@ public class TSServiceImpl implements TSIService.Iface {
         boolean enableRedirectQuery) {
       this.plan = plan;
       this.queryStartTime = queryStartTime;
-      this.sessionId = sessionId;
+      this.session = session;
       this.statement = statement;
       this.statementId = statementId;
       this.timeout = timeout;
@@ -214,11 +215,11 @@ public class TSServiceImpl implements TSIService.Iface {
 
     @Override
     public TSExecuteStatementResp call() throws Exception {
-      String username = SESSION_MANAGER.getUsername(sessionId);
+      String username = session.getUsername();
       plan.setLoginUserName(username);
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
-      AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
 
       final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
       QueryContext context =
@@ -252,13 +253,13 @@ public class TSServiceImpl implements TSIService.Iface {
 
   protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
-    private final long sessionId;
+    private final IClientSession session;
     private final long queryId;
     private final int fetchSize;
     private final boolean isAlign;
 
-    public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) {
-      this.sessionId = sessionId;
+    public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
+      this.session = session;
       this.queryId = queryId;
       this.fetchSize = fetchSize;
       this.isAlign = isAlign;
@@ -270,8 +271,7 @@ public class TSServiceImpl implements TSIService.Iface {
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try {
         if (isAlign) {
-          TSQueryDataSet result =
-              fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+          TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = result.bufferForTime().limit() != 0;
           if (!hasResultSet) {
             SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
@@ -281,8 +281,7 @@ public class TSServiceImpl implements TSIService.Iface {
           resp.setIsAlign(true);
         } else {
           TSQueryNonAlignDataSet nonAlignResult =
-              fillRpcNonAlignReturnData(
-                  fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+              fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = false;
           for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
             if (timeBuffer.limit() != 0) {
@@ -323,8 +322,13 @@ public class TSServiceImpl implements TSIService.Iface {
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
     BasicOpenSessionResp openSessionResp =
-        serviceProvider.openSession(
-            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+        serviceProvider.login(
+            SESSION_MANAGER.getCurrSession(),
+            req.username,
+            req.password,
+            req.zoneId,
+            req.client_protocol,
+            clientVersion);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
@@ -341,7 +345,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
-        !serviceProvider.closeSession(req.sessionId)
+        !serviceProvider.closeSession(SESSION_MANAGER.getCurrSession())
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -355,13 +359,18 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
     return serviceProvider.closeOperation(
-        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+        SESSION_MANAGER.getCurrSession(),
+        req.queryId,
+        req.statementId,
+        req.isSetStatementId(),
+        req.isSetQueryId());
   }
 
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     TSFetchMetadataResp resp = new TSFetchMetadataResp();
-    TSStatus status = checkLoginStatus(req.getSessionId());
+    TSStatus status = checkLoginStatus(session);
     if (isStatusNotSuccess(status)) {
       return resp.setStatus(status);
     }
@@ -503,10 +512,11 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     List<TSStatus> result = new ArrayList<>();
     boolean isAllSuccessful = true;
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
@@ -522,10 +532,7 @@ public class TSServiceImpl implements TSIService.Iface {
         PhysicalPlan physicalPlan =
             serviceProvider
                 .getPlanner()
-                .parseSQLToPhysicalPlan(
-                    statement,
-                    SESSION_MANAGER.getZoneId(req.sessionId),
-                    SESSION_MANAGER.getClientVersion(req.sessionId));
+                .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
         if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -539,7 +546,7 @@ public class TSServiceImpl implements TSIService.Iface {
             index = 0;
           }
 
-          TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
           if (status != null) {
             insertRowsPlan.getResults().put(index, status);
             isAllSuccessful = false;
@@ -561,7 +568,7 @@ public class TSServiceImpl implements TSIService.Iface {
             multiPlan = new CreateMultiTimeSeriesPlan();
             executeList.add(multiPlan);
           }
-          TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
           if (status != null) {
             multiPlan.getResults().put(i, status);
             isAllSuccessful = false;
@@ -586,7 +593,7 @@ public class TSServiceImpl implements TSIService.Iface {
             executeList.clear();
           }
           long t2 = System.currentTimeMillis();
-          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId());
+          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, session);
           addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
           result.add(resp.status);
           if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -611,32 +618,24 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     String statement = req.getStatement();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
-
       long startTime = System.currentTimeMillis();
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getZoneId(req.getSessionId()),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
+        return submitQueryTask(session, physicalPlan, startTime, req);
       } else {
         return executeUpdateStatement(
-            statement,
-            req.statementId,
-            physicalPlan,
-            req.fetchSize,
-            req.timeout,
-            req.getSessionId());
+            session, statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
       }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -651,8 +650,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -661,13 +661,10 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
+        return submitQueryTask(session, physicalPlan, startTime, req);
       } else {
         return RpcUtils.getTSExecuteStatementResp(
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -687,8 +684,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -696,10 +694,7 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .rawDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .rawDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -708,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        session,
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -733,8 +728,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -742,10 +738,7 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .lastDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .lastDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -754,7 +747,7 @@ public class TSServiceImpl implements TSIService.Iface {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        session,
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -778,8 +771,9 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSExecuteStatementResp submitQueryTask(
-      PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
-    TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+      IClientSession session, PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req)
+      throws Exception {
+    TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -788,7 +782,7 @@ public class TSServiceImpl implements TSIService.Iface {
         new QueryTask(
             physicalPlan,
             startTime,
-            req.sessionId,
+            session,
             req.statement,
             req.statementId,
             req.timeout,
@@ -929,15 +923,15 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSExecuteStatementResp executeSelectIntoStatement(
+      IClientSession session,
       String statement,
       long statementId,
       PhysicalPlan physicalPlan,
       int fetchSize,
-      long timeout,
-      long sessionId)
+      long timeout)
       throws IoTDBException, TException, SQLException, IOException, InterruptedException,
           QueryFilterOptimizationException {
-    TSStatus status = serviceProvider.checkAuthority(physicalPlan, sessionId);
+    TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -951,8 +945,7 @@ public class TSServiceImpl implements TSIService.Iface {
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
     QUERY_FREQUENCY_RECORDER.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+    AUDIT_LOGGER.debug("Session {} execute select into: {}", session, statement);
     if (queryPlan.isEnableTracing()) {
       TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
     }
@@ -970,7 +963,7 @@ public class TSServiceImpl implements TSIService.Iface {
         if (insertTabletPlans.isEmpty()) {
           continue;
         }
-        TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId);
+        TSStatus executionStatus = insertTabletsInternally(session, insertTabletPlans);
         if (isStatusNotSuccess(executionStatus)
             && executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
           return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId);
@@ -989,11 +982,11 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSStatus insertTabletsInternally(
-      List<InsertTabletPlan> insertTabletPlans, long sessionId) {
+      IClientSession session, List<InsertTabletPlan> insertTabletPlans) {
     InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
     for (int i = 0; i < insertTabletPlans.size(); i++) {
       InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
 
       if (status != null) {
         // not authorized
@@ -1009,7 +1002,8 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      IClientSession session = SESSION_MANAGER.getCurrSession();
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSFetchResultsResp(loginStatus);
       }
@@ -1020,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       Future<TSFetchResultsResp> resp =
           QueryTaskManager.getInstance()
-              .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign));
+              .submit(new FetchResultsTask(session, req.queryId, req.fetchSize, req.isAlign));
       return resp.get();
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -1075,7 +1069,8 @@ public class TSServiceImpl implements TSIService.Iface {
   /** update statement can be: 1. select-into statement 2. non-query statement */
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return RpcUtils.getTSExecuteStatementResp(loginStatus);
     }
@@ -1084,19 +1079,12 @@ public class TSServiceImpl implements TSIService.Iface {
           serviceProvider
               .getPlanner()
               .parseSQLToPhysicalPlan(
-                  req.statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  req.statement, session.getZoneId(), session.getClientVersion());
       return physicalPlan.isQuery()
           ? RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
           : executeUpdateStatement(
-              req.statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              req.getSessionId());
+              session, req.statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -1112,21 +1100,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   /** update statement can be: 1. select-into statement 2. non-query statement */
   private TSExecuteStatementResp executeUpdateStatement(
+      IClientSession session,
       String statement,
       long statementId,
       PhysicalPlan plan,
       int fetchSize,
-      long timeout,
-      long sessionId)
+      long timeout)
       throws TException, SQLException, IoTDBException, IOException, InterruptedException,
           QueryFilterOptimizationException {
     return plan.isSelectInto()
-        ? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId)
-        : executeNonQueryStatement(plan, sessionId);
+        ? executeSelectIntoStatement(session, statement, statementId, plan, fetchSize, timeout)
+        : executeNonQueryStatement(plan, session);
   }
 
-  private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
-    TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+  private TSExecuteStatementResp executeNonQueryStatement(
+      PhysicalPlan plan, IClientSession session) {
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null
         ? new TSExecuteStatementResp(status)
         : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
@@ -1134,9 +1123,9 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   public void handleClientExit() {
-    Long sessionId = SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      TSCloseSessionReq req = new TSCloseSessionReq();
       closeSession(req);
     }
   }
@@ -1151,7 +1140,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1166,7 +1155,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
@@ -1202,14 +1191,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecords(TSInsertRecordsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1224,7 +1214,7 @@ public class TSServiceImpl implements TSIService.Iface {
                 req.getMeasurementsList().get(i).toArray(new String[0]),
                 req.valuesList.get(i),
                 req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
           allCheckSuccess = false;
@@ -1254,14 +1244,14 @@ public class TSServiceImpl implements TSIService.Iface {
   /**
    * Checking the Login Status.
    *
-   * @param sessionId Session id.
+   * @param session client session.
    * @return When not login or session timeout, will return error status.
    */
-  private TSStatus checkLoginStatus(long sessionId) {
-    if (!serviceProvider.checkLogin(sessionId)) {
+  private TSStatus checkLoginStatus(IClientSession session) {
+    if (!serviceProvider.checkLogin(session)) {
       return getNotLoggedInStatus();
     }
-    if (serviceProvider.checkSessionTimeout(sessionId)) {
+    if (serviceProvider.checkSessionTimeout(session)) {
       return getSessionTimeoutStatus();
     }
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1290,14 +1280,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1311,7 +1302,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.getMeasurementsList(),
               req.getValuesList(),
               req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       statusList.add(status != null ? status : executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
       statusList.add(
@@ -1336,14 +1327,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1359,7 +1351,7 @@ public class TSServiceImpl implements TSIService.Iface {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1394,14 +1386,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1417,7 +1410,7 @@ public class TSServiceImpl implements TSIService.Iface {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1507,14 +1500,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecord(TSInsertRecordReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1525,7 +1519,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.getMeasurements().toArray(new String[0]),
               req.values,
               req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       if (status != null) {
         return status;
@@ -1542,14 +1536,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1561,7 +1556,7 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       if (status != null) {
         return status;
@@ -1578,8 +1573,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1591,7 +1587,7 @@ public class TSServiceImpl implements TSIService.Iface {
         paths.add(new PartialPath(path));
       }
       plan.addPaths(paths);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
@@ -1604,9 +1600,10 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertTablet(TSInsertTabletReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1621,7 +1618,7 @@ public class TSServiceImpl implements TSIService.Iface {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
       insertTabletPlan.setAligned(req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
 
       if (status != null) {
         return status;
@@ -1640,13 +1637,14 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
-      return insertTabletsInternally(req);
+      return insertTabletsInternally(session, req);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode());
     } catch (NullPointerException e) {
@@ -1662,6 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
       throws IllegalPathException {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     InsertTabletPlan insertTabletPlan =
         new InsertTabletPlan(new PartialPath(req.prefixPaths.get(i)), req.measurementsList.get(i));
     insertTabletPlan.setTimes(
@@ -1682,12 +1681,13 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   /** construct one InsertMultiTabletPlan and process it */
-  public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws IllegalPathException {
+  private TSStatus insertTabletsInternally(IClientSession session, TSInsertTabletsReq req)
+      throws IllegalPathException {
     List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
     InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
     for (int i = 0; i < req.prefixPaths.size(); i++) {
       InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
       if (status != null) {
         // not authorized
         insertMultiTabletPlan.getResults().put(i, status);
@@ -1701,14 +1701,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
 
       SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1721,8 +1722,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1732,7 +1734,7 @@ public class TSServiceImpl implements TSIService.Iface {
         storageGroupList.add(new PartialPath(storageGroup));
       }
       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1744,14 +1746,14 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+        AUDIT_LOGGER.debug("Session-{} create timeseries {}", session, req.getPath());
       }
 
       CreateTimeSeriesPlan plan =
@@ -1764,7 +1766,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.tags,
               req.attributes,
               req.measurementAlias);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1776,8 +1778,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1785,7 +1788,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            session,
             req.getPrefixPath(),
             req.getMeasurements());
       }
@@ -1811,7 +1814,7 @@ public class TSServiceImpl implements TSIService.Iface {
               encodings,
               compressors,
               req.measurementAlias);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1824,15 +1827,16 @@ public class TSServiceImpl implements TSIService.Iface {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            session,
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1863,7 +1867,7 @@ public class TSServiceImpl implements TSIService.Iface {
       CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
       for (int i = 0; i < req.paths.size(); i++) {
         plan.setPath(new PartialPath(req.paths.get(i)));
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
         if (status != null) {
           // not authorized
           multiPlan.getResults().put(i, status);
@@ -1911,8 +1915,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1922,7 +1927,7 @@ public class TSServiceImpl implements TSIService.Iface {
         pathList.add(new PartialPath(path));
       }
       DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1934,28 +1939,27 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    return SESSION_MANAGER.requestStatementId(session);
   }
 
   @Override
   public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.getName());
+        AUDIT_LOGGER.debug("Session-{} create schema template {}", session, req.getName());
       }
 
       CreateTemplatePlan plan;
       // Construct plan from serialized request
       ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
       plan = CreateTemplatePlan.deserializeFromReq(buffer);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (Exception e) {
@@ -1966,6 +1970,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     int size = req.getMeasurementsSize();
     String[] measurements = new String[size];
     TSDataType[] dataTypes = new TSDataType[size];
@@ -1982,20 +1987,22 @@ public class TSServiceImpl implements TSIService.Iface {
     AppendTemplatePlan plan =
         new AppendTemplatePlan(
             req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
   @Override
   public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     PruneTemplatePlan plan =
         new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
   @Override
   public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     TSQueryTemplateResp resp = new TSQueryTemplateResp();
     try {
       String path;
@@ -2044,21 +2051,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getTemplateName(),
           req.getPrefixPath());
     }
 
     try {
       SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2067,21 +2075,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} unset schema template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTemplateName());
     }
 
     try {
       UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2091,23 +2100,21 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus unsetUsingTemplate(long sessionId, String templateName, String prefixPath)
       throws TException {
-    TSStatus loginStatus = checkLoginStatus(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "Session-{} unset using schema template {} on {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          templateName,
-          prefixPath);
+          "Session-{} unset using schema template {} on {}", session, templateName, prefixPath);
     }
 
     try {
       DeactivateTemplatePlan plan =
           new DeactivateTemplatePlan(templateName, new PartialPath(prefixPath));
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (MetadataException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2116,20 +2123,19 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setUsingTemplate(TSSetUsingTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "Session-{} create timeseries of schema template on path {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getDstPath());
+          "Session-{} create timeseries of schema template on path {}", session, req.getDstPath());
     }
 
     try {
       ActivateTemplatePlan plan = new ActivateTemplatePlan(new PartialPath(req.getDstPath()));
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2138,19 +2144,17 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "Session-{} drop schema template {}.",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getTemplateName());
+      AUDIT_LOGGER.debug("Session-{} drop schema template {}.", session, req.getTemplateName());
     }
 
     DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -2164,6 +2168,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     PhysicalPlan physicalPlan;
     try {
       ByteBuffer planBuffer = req.physicalPlan;