You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2022/10/23 15:45:29 UTC

[iotdb] branch feature-client-session-0.13 created (now 32178b0e5e)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 32178b0e5e use threadLocal<ClientSession> to replace session id

This branch includes the following new commits:

     new 198068730d update jdbc and client-py's version to release 0.13.3 (#7687)
     new 32178b0e5e use threadLocal<ClientSession> to replace session id

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: use threadLocal to replace session id

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32178b0e5e4560643fa1bf4718b911b9a04e94b8
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sun Oct 23 23:45:10 2022 +0800

    use threadLocal<ClientSession> to replace session id
---
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |   4 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   9 +-
 external-api/pom.xml                               |   8 +
 .../external/api/thrift/JudgableServerContext.java |  41 +++
 .../external/api/thrift/ServerContextFactory.java  |  23 ++
 .../iotdb/db/protocol/mqtt/PublishHandler.java     |  35 ++-
 .../iotdb/db/qp/physical/crud/GroupByTimePlan.java |   2 +-
 .../apache/iotdb/db/qp/utils/DateTimeUtils.java    |   2 +-
 .../iotdb/db/query/control/SessionManager.java     | 153 +++++----
 .../db/query/control/SessionTimeoutManager.java    |  33 +-
 .../query/control/clientsession/ClientSession.java |  41 +++
 .../control/clientsession/IClientSession.java      | 101 ++++++
 .../control/clientsession/MqttClientSession.java   |  46 +++
 .../dataset/groupby/GroupByEngineDataSet.java      |   2 +-
 .../apache/iotdb/db/query/executor/fill/IFill.java |   2 +-
 .../iotdb/db/service/basic/ServiceProvider.java    |  75 +++--
 .../thrift/TThreadPoolServerWithContext.java       | 150 +++++++++
 .../db/service/thrift/ThriftServiceThread.java     |   3 +-
 .../thrift/handler/RPCServiceThriftHandler.java    |  67 +++-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  42 ++-
 .../db/service/thrift/impl/TSServiceImpl.java      | 341 +++++++++++----------
 22 files changed, 862 insertions(+), 320 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 1cbed9d0af..9de6b5608d 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -578,7 +578,7 @@ public abstract class AbstractCli {
    * @param columnCount the number of column
    * @param resultSetMetaData jdbc resultSetMetaData
    * @param zoneId your time zone
-   * @return List<List<String>> result
+   * @return {@literal List<List<String>> result}
    * @throws SQLException throw exception
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index f0495c43f8..c966382104 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -641,8 +641,8 @@ public class ImportCsv extends AbstractCsvTool {
    * read data from the CSV file
    *
    * @param path
-   * @return
-   * @throws IOException
+   * @return CSVParser csv parser
+   * @throws IOException when reading the csv file failed.
    */
   private static CSVParser readCsvFile(String path) throws IOException {
     return CSVFormat.Builder.create(CSVFormat.DEFAULT)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index f7c5e95854..68e4c08a2e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.cluster.server;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.query.manage.ClusterSessionManager;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics;
 import org.apache.iotdb.db.service.thrift.ThriftService;
@@ -79,7 +81,12 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
               config.getThriftServerAwaitTimeForStopService(),
-              new RPCServiceThriftHandler(impl),
+              new RPCServiceThriftHandler(impl) {
+                @Override
+                protected SessionManager getSessionManager() {
+                  return ClusterSessionManager.getInstance();
+                }
+              },
               IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
diff --git a/external-api/pom.xml b/external-api/pom.xml
index 086c6a08b4..28ec1eed74 100644
--- a/external-api/pom.xml
+++ b/external-api/pom.xml
@@ -28,6 +28,14 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>external-api</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${thrift.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
     <profiles>
         <profile>
             <id>get-jar-with-dependencies</id>
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
new file mode 100644
index 0000000000..5948804a57
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+import org.apache.thrift.server.ServerContext;
+
+public interface JudgableServerContext extends ServerContext {
+
+  /**
+   * this method will be called when a client connects to the IoTDB server.
+   *
+   * @return true / false
+   */
+  public boolean authorised();
+
+  @Override
+  default <T> T unwrap(Class<T> iface) {
+    return null;
+  }
+
+  @Override
+  default boolean isWrapperFor(Class<?> iface) {
+    return false;
+  };
+}
diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
new file mode 100644
index 0000000000..b6788b376a
--- /dev/null
+++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.external.api.thrift;
+
+public interface ServerContextFactory {
+  JudgableServerContext newServerContext();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 892e17cde5..8c6a1d9ad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -46,7 +46,8 @@ import java.util.concurrent.ConcurrentHashMap;
 public class PublishHandler extends AbstractInterceptHandler {
 
   private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
-  private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
+      new ConcurrentHashMap<>();
   private static final boolean isEnableOperationSync =
       IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
   private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
@@ -68,15 +69,17 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onConnect(InterceptConnectMessage msg) {
-    if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
       try {
-        BasicOpenSessionResp basicOpenSessionResp =
-            serviceProvider.openSession(
-                msg.getUsername(),
-                new String(msg.getPassword()),
-                ZoneId.systemDefault().toString(),
-                TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
-        clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId());
+        MqttClientSession session = new MqttClientSession(msg.getClientID());
+        // TODO should we put this session into a ThreadLocal in SessionManager?
+        serviceProvider.login(
+            session,
+            msg.getUsername(),
+            new String(msg.getPassword()),
+            ZoneId.systemDefault().toString(),
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        clientIdToSessionMap.put(msg.getClientID(), session);
       } catch (TException e) {
         throw new RuntimeException(e);
       }
@@ -85,19 +88,19 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onDisconnect(InterceptDisconnectMessage msg) {
-    Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
-    if (null != sessionId) {
-      serviceProvider.closeSession(sessionId);
+    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+    if (null != session) {
+      serviceProvider.closeSession(session);
     }
   }
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
     String clientId = msg.getClientID();
-    if (!clientIdToSessionIdMap.containsKey(clientId)) {
+    if (!clientIdToSessionMap.containsKey(clientId)) {
       return;
     }
-    long sessionId = clientIdToSessionIdMap.get(clientId);
+    MqttClientSession session = clientIdToSessionMap.get(clientId);
     ByteBuf payload = msg.getPayload();
     String topic = msg.getTopicName();
     String username = msg.getUsername();
@@ -132,7 +135,7 @@ public class PublishHandler extends AbstractInterceptHandler {
                 event.getTimestamp(),
                 event.getMeasurements().toArray(new String[0]),
                 event.getValues().toArray(new String[0]));
-        TSStatus tsStatus = serviceProvider.checkAuthority(plan, sessionId);
+        TSStatus tsStatus = serviceProvider.checkAuthority(plan, session);
         if (tsStatus != null) {
           LOG.warn(tsStatus.message);
         } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index 1fab54b56c..bfb8785eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan {
               plan.getEndTime(),
               plan.isSlidingStepByMonth(),
               plan.isIntervalByMonth(),
-              SessionManager.getInstance().getCurrSessionTimeZone())));
+              SessionManager.getInstance().getSessionTimeZone())));
     } else {
       return new GlobalTimeExpression(
           new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
index b6901726bd..eb25720a2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java
@@ -584,7 +584,7 @@ public class DateTimeUtils {
           res *= 30 * 86_400_000L;
         } else {
           Calendar calendar = Calendar.getInstance();
-          calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+          calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
           calendar.setTimeInMillis(currentTime);
           calendar.add(Calendar.MONTH, (int) (value));
           res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 7e1861eeed..061f635d81 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
@@ -34,69 +35,110 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * Session Manager is for mananging active sessions. It will be used by both Thrift based services
+ * (i.e., TSServiceImpl and InfluxdbService) and Mqtt based service. <br>
+ * Thrift based services are client-thread model, i.e., each client has a thread. So we can use
+ * threadLocal for such services.<br>
+ * However, Mqtt based service use message-thread model, i.e, each message has a short thread. So,
+ * we can not use threadLocal for such services.
+ */
 public class SessionManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
 
   // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+  /** currSession can be only used in client-thread model services. */
+  private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>();
+
+  // we keep this sessionIdGenerator just for keep Compatible with v0.13
+  @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong();
 
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
   // The statementId is unique in one IoTDB instance.
   private final AtomicLong statementIdGenerator = new AtomicLong();
 
   // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+  private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>();
   // (statementId -> Set(queryId))
   private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
   // (queryId -> QueryDataSet)
   private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
 
-  // (sessionId -> client version number)
-  private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
-      new ConcurrentHashMap<>();
-
   protected SessionManager() {
     // singleton
   }
 
-  public Long getCurrSessionId() {
-    return currSessionId.get();
-  }
-
-  public void removeCurrSessionId() {
-    currSessionId.remove();
+  /**
+   * this method can be only used in client-thread model.
+   *
+   * @return
+   */
+  public IClientSession getCurrSession() {
+    return currSession.get();
   }
 
-  public TimeZone getCurrSessionTimeZone() {
-    if (getCurrSessionId() != null) {
-      return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+  public TimeZone getSessionTimeZone() {
+    IClientSession session = currSession.get();
+    if (session != null) {
+      return session.getTimeZone();
     } else {
       // only used for test
       return TimeZone.getTimeZone("+08:00");
     }
   }
 
-  public long requestSessionId(
-      String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
-    long sessionId = sessionIdGenerator.incrementAndGet();
-
-    currSessionId.set(sessionId);
-    sessionIdToUsername.put(sessionId, username);
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
-    sessionIdToClientVersion.put(sessionId, clientVersion);
+  /**
+   * this method can be only used in client-thread model. But, in message-thread model based
+   * service, calling this method has no side effect. <br>
+   * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this
+   * method in thrift's event handler.
+   *
+   * @return
+   */
+  public void removeCurrSession() {
+    currSession.remove();
+  }
 
-    return sessionId;
+  /**
+   * this method can be only used in client-thread model. Do not use this method in message-thread
+   * model based service.
+   *
+   * @param session
+   * @return false if the session has been initialized.
+   */
+  public boolean registerSession(IClientSession session) {
+    if (this.currSession.get() != null) {
+      LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug.");
+      return false;
+    }
+    this.currSession.set(session);
+    return true;
   }
 
-  public boolean releaseSessionResource(long sessionId) {
-    sessionIdToZoneId.remove(sessionId);
-    sessionIdToClientVersion.remove(sessionId);
+  /**
+   * must be called after registerSession()) will mark the session login.
+   *
+   * @param username
+   * @param zoneId
+   * @param clientVersion
+   */
+  public void supplySession(
+      IClientSession session,
+      String username,
+      String zoneId,
+      IoTDBConstant.ClientVersion clientVersion) {
+    session.setId(sessionIdGenerator.incrementAndGet());
+    session.setUsername(username);
+    session.setZoneId(ZoneId.of(zoneId));
+    session.setClientVersion(clientVersion);
+    session.setLogin(true);
+  }
 
-    Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
+  /**
+   * @param session
+   * @return true if releasing successfully, false otherwise (e.g., the session does not exist)
+   */
+  public boolean releaseSessionResource(IClientSession session) {
+    Set<Long> statementIdSet = sessionToStatementId.remove(session);
     if (statementIdSet != null) {
       for (Long statementId : statementIdSet) {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
@@ -106,34 +148,41 @@ public class SessionManager {
           }
         }
       }
+      return true;
     }
-
-    return sessionIdToUsername.remove(sessionId) != null;
+    // TODO if there is no statement for the session, how to return (true or false?)
+    return false;
   }
 
-  public long getSessionIdByQueryId(long queryId) {
+  /**
+   * @param queryId
+   * @return null if not found. (e.g., the client session is closed already. TODO: do we really have
+   *     this case?)
+   */
+  public IClientSession getSessionIdByQueryId(long queryId) {
     // TODO: make this more efficient with a queryId -> sessionId map
     for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) {
       if (statementToQueries.getValue().contains(queryId)) {
-        for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) {
+        for (Map.Entry<IClientSession, Set<Long>> sessionToStatements :
+            sessionToStatementId.entrySet()) {
           if (sessionToStatements.getValue().contains(statementToQueries.getKey())) {
             return sessionToStatements.getKey();
           }
         }
       }
     }
-    return -1;
+    return null;
   }
 
-  public long requestStatementId(long sessionId) {
+  public long requestStatementId(IClientSession session) {
     long statementId = statementIdGenerator.incrementAndGet();
-    sessionIdToStatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+    sessionToStatementId
+        .computeIfAbsent(session, s -> new CopyOnWriteArraySet<>())
         .add(statementId);
     return statementId;
   }
 
-  public void closeStatement(long sessionId, long statementId) {
+  public void closeStatement(IClientSession session, long statementId) {
     Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
     if (queryIdSet != null) {
       for (Long queryId : queryIdSet) {
@@ -141,8 +190,8 @@ public class SessionManager {
       }
     }
 
-    if (sessionIdToStatementId.containsKey(sessionId)) {
-      sessionIdToStatementId.get(sessionId).remove(statementId);
+    if (sessionToStatementId.containsKey(session)) {
+      sessionToStatementId.get(session).remove(statementId);
     }
   }
 
@@ -176,18 +225,6 @@ public class SessionManager {
     }
   }
 
-  public String getUsername(Long sessionId) {
-    return sessionIdToUsername.get(sessionId);
-  }
-
-  public ZoneId getZoneId(Long sessionId) {
-    return sessionIdToZoneId.get(sessionId);
-  }
-
-  public void setTimezone(Long sessionId, String zone) {
-    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
-  }
-
   public boolean hasDataset(Long queryId) {
     return queryIdToDataSet.containsKey(queryId);
   }
@@ -211,10 +248,6 @@ public class SessionManager {
     }
   }
 
-  public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) {
-    return sessionIdToClientVersion.get(sessionId);
-  }
-
   public static SessionManager getInstance() {
     return SessionManagerHelper.INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
index 1f179c5e40..c9ff8efb02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 
 import org.slf4j.Logger;
@@ -37,7 +38,7 @@ public class SessionTimeoutManager {
   private static final long SESSION_TIMEOUT =
       IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold();
 
-  private Map<Long, Long> sessionIdToLastActiveTime;
+  private Map<IClientSession, Long> sessionToLastActiveTime;
   private ScheduledExecutorService executorService;
 
   private SessionTimeoutManager() {
@@ -45,7 +46,7 @@ public class SessionTimeoutManager {
       return;
     }
 
-    this.sessionIdToLastActiveTime = new ConcurrentHashMap<>();
+    this.sessionToLastActiveTime = new ConcurrentHashMap<>();
     this.executorService =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager");
 
@@ -59,37 +60,43 @@ public class SessionTimeoutManager {
         TimeUnit.MILLISECONDS);
   }
 
-  public void register(long sessionId) {
+  public void register(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis());
+    sessionToLastActiveTime.put(session, System.currentTimeMillis());
   }
 
-  public boolean unregister(long sessionId) {
+  /**
+   * unregister the session and release all its query resources.
+   *
+   * @param session
+   * @return true if removing successfully, false otherwise (e.g., the session does not exist)
+   */
+  public boolean unregister(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
-      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId);
+      return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session);
     }
 
-    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) {
-      return sessionIdToLastActiveTime.remove(sessionId) != null;
+    if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) {
+      return sessionToLastActiveTime.remove(session) != null;
     }
 
     return false;
   }
 
-  public void refresh(long sessionId) {
+  public void refresh(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return;
     }
 
-    sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis());
+    sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis());
   }
 
   private void cleanup() {
     long currentTime = System.currentTimeMillis();
-    sessionIdToLastActiveTime.entrySet().stream()
+    sessionToLastActiveTime.entrySet().stream()
         .filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime)
         .forEach(
             entry -> {
@@ -106,11 +113,11 @@ public class SessionTimeoutManager {
     return SessionTimeoutManagerHelper.INSTANCE;
   }
 
-  public boolean isSessionAlive(long sessionId) {
+  public boolean isSessionAlive(IClientSession session) {
     if (SESSION_TIMEOUT == 0) {
       return true;
     }
-    return sessionIdToLastActiveTime.containsKey(sessionId);
+    return sessionToLastActiveTime.containsKey(session);
   }
 
   private static class SessionTimeoutManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
new file mode 100644
index 0000000000..9edcc6f077
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import java.net.InetSocketAddress;
+
+/** Client Session is the only identity for a connection. */
+public class ClientSession extends IClientSession {
+
+  InetSocketAddress clientNet;
+
+  public ClientSession(InetSocketAddress clientNet) {
+    this.clientNet = clientNet;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientNet.getHostName();
+  }
+
+  @Override
+  int getClientPort() {
+    return clientNet.getPort();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
new file mode 100644
index 0000000000..b3ae42e68b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
+
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+public abstract class IClientSession {
+
+  /** id is just used for keep compatible with v0.13 */
+  @Deprecated long id;
+
+  ClientVersion clientVersion;
+
+  ZoneId zoneId;
+
+  // TODO: why some Statement Plans use timeZone while others use ZoneId?
+  TimeZone timeZone;
+
+  String username;
+
+  boolean login = false;
+
+  abstract String getClientAddress();
+
+  abstract int getClientPort();
+
+  public void setClientVersion(ClientVersion clientVersion) {
+    this.clientVersion = clientVersion;
+  }
+
+  public ClientVersion getClientVersion() {
+    return this.clientVersion;
+  }
+
+  public ZoneId getZoneId() {
+    return this.zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+    this.timeZone = TimeZone.getTimeZone(zoneId);
+  }
+
+  public TimeZone getTimeZone() {
+    return timeZone;
+  }
+
+  public void setTimeZone(TimeZone timeZone) {
+    this.timeZone = timeZone;
+    this.zoneId = timeZone.toZoneId();
+  }
+
+  public String getUsername() {
+    return this.username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public boolean isLogin() {
+    return login;
+  }
+
+  public void setLogin(boolean login) {
+    this.login = login;
+  }
+
+  @Deprecated
+  public long getId() {
+    return id;
+  }
+
+  @Deprecated
+  public void setId(long id) {
+    this.id = id;
+  }
+
+  public String toString() {
+    return String.format("%d-%s:%d", getId(), getClientAddress(), getClientPort());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
new file mode 100644
index 0000000000..b458cb22f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control.clientsession;
+
+public class MqttClientSession extends IClientSession {
+
+  String clientID;
+
+  public MqttClientSession(String clientID) {
+    this.clientID = clientID;
+  }
+
+  public String getClientID() {
+    return clientID;
+  }
+
+  @Override
+  public String getClientAddress() {
+    return clientID;
+  }
+
+  @Override
+  public int getClientPort() {
+    return 0;
+  }
+
+  public String toString() {
+    return String.format("%d-%s", getId(), getClientID());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index ccd374b9b3..ed54e5920a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -209,7 +209,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
    */
   public static long calcIntervalByMonth(long startTime, long numMonths) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     boolean isLastDayOfMonth =
         calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 5fbb0575dd..0ccfb23e15 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -143,7 +143,7 @@ public abstract class IFill {
 
   protected long slideMonth(long startTime, int monthNum) {
     Calendar calendar = Calendar.getInstance();
-    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
+    calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     calendar.add(Calendar.MONTH, monthNum);
     return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 0806dec7a7..0987d2d97f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.tracing.TracingManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -58,6 +59,10 @@ import java.sql.SQLException;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 
+/**
+ * There is only one ServiceProvider instance for each IoTDB instance. Both client-thread model
+ * based services and message-thread model based services (e.g., mqtt) are using this service.
+ */
 public abstract class ServiceProvider {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
@@ -105,14 +110,13 @@ public abstract class ServiceProvider {
    *
    * @return true: If logged in; false: If not logged in
    */
-  public boolean checkLogin(long sessionId) {
-    Long currSessionId = SESSION_MANAGER.getCurrSessionId();
-    boolean isLoggedIn = currSessionId != null && currSessionId == sessionId;
+  public boolean checkLogin(IClientSession session) {
+    boolean isLoggedIn = session != null && session.isLogin();
     if (!isLoggedIn) {
       LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
       return false;
     } else {
-      SessionTimeoutManager.getInstance().refresh(sessionId);
+      SessionTimeoutManager.getInstance().refresh(session);
     }
     return isLoggedIn;
   }
@@ -120,11 +124,11 @@ public abstract class ServiceProvider {
   /**
    * Check whether current session is timeout.
    *
-   * @param sessionId Session id.
+   * @param session clientSession.
    * @return true: If session timeout; false: If not session timeout.
    */
-  public boolean checkSessionTimeout(long sessionId) {
-    if (!SessionTimeoutManager.getInstance().isSessionAlive(sessionId)) {
+  public boolean checkSessionTimeout(IClientSession session) {
+    if (!SessionTimeoutManager.getInstance().isSessionAlive(session)) {
       return true;
     }
     return false;
@@ -143,9 +147,9 @@ public abstract class ServiceProvider {
         username, plan.getAuthPaths(), plan.getOperatorType(), targetUser);
   }
 
-  public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+  public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) {
     try {
-      if (!checkAuthorization(plan, SESSION_MANAGER.getUsername(sessionId))) {
+      if (!checkAuthorization(plan, session.getUsername())) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation, please add privilege "
@@ -162,7 +166,8 @@ public abstract class ServiceProvider {
     return null;
   }
 
-  public BasicOpenSessionResp openSession(
+  public BasicOpenSessionResp login(
+      IClientSession session,
       String username,
       String password,
       String zoneId,
@@ -187,7 +192,6 @@ public abstract class ServiceProvider {
       loginMessage = e.getMessage();
     }
 
-    long sessionId = -1;
     if (status) {
       // check the version compatibility
       boolean compatible = checkCompatibility(tsProtocolVersion);
@@ -195,74 +199,69 @@ public abstract class ServiceProvider {
         openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
         openSessionResp.setMessage(
             "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION);
-        return openSessionResp.sessionId(sessionId);
+        return openSessionResp.sessionId(-1);
       }
 
       openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       openSessionResp.setMessage("Login successfully");
 
-      sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
-
+      SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion);
       LOGGER.info(
           "{}: Login status: {}. User : {}, opens Session-{}",
           IoTDBConstant.GLOBAL_DB_NAME,
           openSessionResp.getMessage(),
           username,
-          sessionId);
+          session);
     } else {
       openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed.");
       openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode());
-
-      sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion);
       AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username);
+      // TODO we should close this connection ASAP, otherwise there will be DDoS.
     }
-
-    SessionTimeoutManager.getInstance().register(sessionId);
-    return openSessionResp.sessionId(sessionId);
+    SessionTimeoutManager.getInstance().register(session);
+    return openSessionResp.sessionId(session == null ? -1 : session.getId());
   }
 
-  public BasicOpenSessionResp openSession(
-      String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
+  public BasicOpenSessionResp login(
+      IClientSession session,
+      String username,
+      String password,
+      String zoneId,
+      TSProtocolVersion tsProtocolVersion)
       throws TException {
-    return openSession(
-        username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
+    return login(
+        session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12);
   }
 
-  public boolean closeSession(long sessionId) {
-    AUDIT_LOGGER.info("Session-{} is closing", sessionId);
-
-    SESSION_MANAGER.removeCurrSessionId();
-
-    return SessionTimeoutManager.getInstance().unregister(sessionId);
+  public boolean closeSession(IClientSession session) {
+    AUDIT_LOGGER.info("Session-{} is closing", session);
+    return SessionTimeoutManager.getInstance().unregister(session);
   }
 
   public TSStatus closeOperation(
-      long sessionId,
+      IClientSession session,
       long queryId,
       long statementId,
       boolean haveStatementId,
       boolean haveSetQueryId) {
-    if (!checkLogin(sessionId)) {
+    if (!checkLogin(session)) {
       return RpcUtils.getStatus(
           TSStatusCode.NOT_LOGIN_ERROR,
           "Log in failed. Either you are not authorized or the session has timed out.");
     }
-    if (checkSessionTimeout(sessionId)) {
+    if (checkSessionTimeout(session)) {
       return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout");
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "{}: receive close operation from Session {}",
-          IoTDBConstant.GLOBAL_DB_NAME,
-          SESSION_MANAGER.getCurrSessionId());
+          "{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, session);
     }
-
     try {
       if (haveStatementId) {
         if (haveSetQueryId) {
           SESSION_MANAGER.closeDataset(statementId, queryId);
         } else {
-          SESSION_MANAGER.closeStatement(sessionId, statementId);
+          SESSION_MANAGER.closeStatement(session, statementId);
         }
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
new file mode 100644
index 0000000000..8d36047509
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java
@@ -0,0 +1,150 @@
+package org.apache.iotdb.db.service.thrift;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+
+public class TThreadPoolServerWithContext extends TThreadPoolServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServerWithContext.class);
+
+  public TThreadPoolServerWithContext(Args args) {
+    super(args);
+  }
+
+  @Override
+  protected void execute() {
+    while (!stopped_) {
+      try {
+        TTransport client = serverTransport_.accept();
+        try {
+          getExecutorService().execute(new WorkerProcess(client));
+        } catch (RejectedExecutionException ree) {
+          if (!stopped_) {
+            LOGGER.warn(
+                "ThreadPool is saturated with incoming requests. Closing latest connection.");
+          }
+          client.close();
+        }
+      } catch (TTransportException ttx) {
+        if (!stopped_) {
+          LOGGER.warn("Transport error occurred during acceptance of message", ttx);
+        }
+      }
+    }
+  }
+
+  // The following codes are copied from TThreadPoolServer.WorkerProcess
+  // and we add additional processing for connectionContext
+
+  private class WorkerProcess implements Runnable {
+
+    /** Client that this services. */
+    private TTransport client_;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private WorkerProcess(TTransport client) {
+      client_ = client;
+    }
+
+    /** Loops on processing a client forever */
+    public void run() {
+      TProcessor processor = null;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+
+      Optional<TServerEventHandler> eventHandler = Optional.empty();
+      ServerContext connectionContext = null;
+
+      try {
+        processor = processorFactory_.getProcessor(client_);
+        inputTransport = inputTransportFactory_.getTransport(client_);
+        outputTransport = outputTransportFactory_.getTransport(client_);
+        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+
+        eventHandler = Optional.ofNullable(getEventHandler());
+
+        if (eventHandler.isPresent()) {
+          connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
+          LOGGER.error("测试一下效果。、。。。。");
+        }
+
+        while (true) {
+          if (Thread.currentThread().isInterrupted()) {
+            LOGGER.debug("WorkerProcess requested to shutdown");
+            break;
+          }
+          if (eventHandler.isPresent()) {
+            eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
+          }
+          // This process cannot be interrupted by Interrupting the Thread. This
+          // will return once a message has been processed or the socket timeout
+          // has elapsed, at which point it will return and check the interrupt
+          // state of the thread.
+          processor.process(inputProtocol, outputProtocol);
+        }
+      } catch (Exception x) {
+        LOGGER.debug("Error processing request", x);
+
+        // We'll usually receive RuntimeException types here
+        // Need to unwrap to ascertain real causing exception before we choose to ignore
+        // Ignore err-logging all transport-level/type exceptions
+        if (!isIgnorableException(x)) {
+          // Log the exception at error level and continue
+          LOGGER.error(
+              (x instanceof TException ? "Thrift " : "")
+                  + "Error occurred during processing of message.",
+              x);
+        }
+      } finally {
+        if (eventHandler.isPresent()) {
+          eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
+        }
+        if (inputTransport != null) {
+          inputTransport.close();
+        }
+        if (outputTransport != null) {
+          outputTransport.close();
+        }
+        if (client_.isOpen()) {
+          client_.close();
+        }
+      }
+    }
+
+    private boolean isIgnorableException(Exception x) {
+      TTransportException tTransportException = null;
+
+      if (x instanceof TTransportException) {
+        tTransportException = (TTransportException) x;
+      } else if (x.getCause() instanceof TTransportException) {
+        tTransportException = (TTransportException) x.getCause();
+      }
+
+      if (tTransportException != null) {
+        switch (tTransportException.getType()) {
+          case TTransportException.END_OF_FILE:
+          case TTransportException.TIMED_OUT:
+            return true;
+        }
+      }
+      return false;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 53cf755ad0..edf98d1774 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -154,7 +154,8 @@ public class ThriftServiceThread extends Thread {
       serverTransport = openTransport(bindAddress, port);
       TThreadPoolServer.Args poolArgs =
           initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
-      poolServer = new TThreadPoolServer(poolArgs);
+      poolServer = new TThreadPoolServerWithContext(poolArgs);
+      logger.warn("注册EventHandler");
       poolServer.setServerEventHandler(serverEventHandler);
     } catch (TTransportException e) {
       catchFailedInitialization(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
index 6761c0d1c9..e7e8c3ce4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java
@@ -16,37 +16,92 @@
  */
 package org.apache.iotdb.db.service.thrift.handler;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.ClientSession;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
+import org.apache.iotdb.external.api.thrift.JudgableServerContext;
+import org.apache.iotdb.external.api.thrift.ServerContextFactory;
+import org.apache.iotdb.rpc.TElasticFramedTransport;
 
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class RPCServiceThriftHandler implements TServerEventHandler {
+  private static final Logger logger = LoggerFactory.getLogger(RPCServiceThriftHandler.class);
   private TSServiceImpl serviceImpl;
   private AtomicLong thriftConnectionNumber = new AtomicLong(0);
 
+  private ServerContextFactory factory = null;
+
   public RPCServiceThriftHandler(TSServiceImpl serviceImpl) {
     this.serviceImpl = serviceImpl;
     MetricService.getInstance()
         .addMetricSet(new RPCServiceThriftHandlerMetrics(thriftConnectionNumber));
+    String factoryClass =
+        IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getCustomizedProperties()
+            .getProperty("rpc_service_thrift_handler_context_class");
+    if (factoryClass != null) {
+      try {
+        factory = (ServerContextFactory) Class.forName(factoryClass).newInstance();
+      } catch (Exception e) {
+        logger.warn(
+            "configuration announced ServerContextFactory {}, but it is not found in classpath",
+            factoryClass);
+        factory = null;
+      }
+    }
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+  public ServerContext createContext(TProtocol in, TProtocol out) {
+    logger.info("创建了连接");
     thriftConnectionNumber.incrementAndGet();
+    Socket socket =
+        ((TSocket) ((TElasticFramedTransport) in.getTransport()).getSocket()).getSocket();
+    logger.info(
+        "in local: {}:{}, remote: {}, default: {}",
+        socket.getLocalSocketAddress(),
+        socket.getLocalPort(),
+        socket.getRemoteSocketAddress(),
+        socket.getPort());
+    socket = ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket();
+    logger.info(
+        "out local: {}:{}, remote: {}, default: {}",
+        socket.getLocalSocketAddress(),
+        socket.getLocalPort(),
+        socket.getRemoteSocketAddress(),
+        socket.getPort());
+    getSessionManager()
+        .registerSession(new ClientSession((InetSocketAddress) socket.getRemoteSocketAddress()));
+    if (factory != null) {
+      JudgableServerContext context = factory.newServerContext();
+      // TODO
+      return context;
+    }
+
     return null;
   }
 
   @Override
   public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+    logger.info("移除了连接");
     // release query resources.
     serviceImpl.handleClientExit();
     thriftConnectionNumber.decrementAndGet();
+    getSessionManager().removeCurrSession();
   }
 
   @Override
@@ -58,4 +113,14 @@ public class RPCServiceThriftHandler implements TServerEventHandler {
   public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
     // nothing
   }
+
+  /**
+   * get the SessionManager Instance. <br>
+   * in v0.13, Cluster mode uses different SessionManager instance...
+   *
+   * @return
+   */
+  protected SessionManager getSessionManager() {
+    return SessionManager.getInstance();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
index 7febf56420..e2467d0d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
@@ -59,6 +61,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   private final InfluxDBMetaManager metaManager;
 
+  private SessionManager sessionManager = SessionManager.getInstance();
+
   public InfluxDBServiceImpl() {
     serviceProvider = IoTDB.serviceProvider;
     metaManager = InfluxDBMetaManager.getInstance();
@@ -66,9 +70,14 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
+    IClientSession session = sessionManager.getCurrSession();
     BasicOpenSessionResp basicOpenSessionResp =
-        serviceProvider.openSession(
-            req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        serviceProvider.login(
+            session,
+            req.username,
+            req.password,
+            req.zoneId,
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
     return new TSOpenSessionResp()
         .setStatus(
             RpcUtils.getInfluxDBStatus(
@@ -78,15 +87,17 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
+    IClientSession session = sessionManager.getCurrSession();
     return new TSStatus(
-        !serviceProvider.closeSession(req.sessionId)
+        !serviceProvider.closeSession(session)
             ? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
   @Override
   public TSStatus writePoints(TSWritePointsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.sessionId);
+    IClientSession session = sessionManager.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
@@ -97,7 +108,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
       IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager);
       try {
         InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan();
-        TSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId);
+        TSStatus tsStatus = executeNonQueryPlan(session, plan, req.sessionId);
         if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
           executeCode = tsStatus.getCode();
@@ -118,11 +129,11 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
     return tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
 
-  private TSStatus checkLoginStatus(long sessionId) {
-    if (!serviceProvider.checkLogin(sessionId)) {
+  private TSStatus checkLoginStatus(IClientSession session) {
+    if (!serviceProvider.checkLogin(session)) {
       return getNotLoggedInStatus();
     }
-    if (serviceProvider.checkSessionTimeout(sessionId)) {
+    if (serviceProvider.checkSessionTimeout(session)) {
       return RpcUtils.getInfluxDBStatus(
           TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout");
     }
@@ -131,14 +142,15 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
 
   @Override
   public TSStatus createDatabase(TSCreateDatabaseReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.sessionId);
+    IClientSession session = sessionManager.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     try {
       SetStorageGroupPlan setStorageGroupPlan =
           new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase()));
-      return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId());
+      return executeNonQueryPlan(session, setStorageGroupPlan, req.getSessionId());
     } catch (IllegalPathException
         | QueryProcessException
         | StorageGroupNotSetException
@@ -152,9 +164,9 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
   }
 
   public void handleClientExit() {
-    Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      closeSession(new TSCloseSessionReq(sessionId));
+    IClientSession session = sessionManager.getCurrSession();
+    if (session != null) {
+      closeSession(new TSCloseSessionReq(session.getId()));
     }
   }
 
@@ -164,10 +176,10 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface {
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
 
-  private TSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId)
+  private TSStatus executeNonQueryPlan(IClientSession session, PhysicalPlan plan, long sessionId)
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     org.apache.iotdb.service.rpc.thrift.TSStatus status =
-        serviceProvider.checkAuthority(plan, sessionId);
+        serviceProvider.checkAuthority(plan, session);
     if (status == null) {
       status =
           serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7e862e532f..5f0793562f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -178,7 +179,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
     private PhysicalPlan plan;
     private final long queryStartTime;
-    private final long sessionId;
+    private final IClientSession session;
     private final String statement;
     private final long statementId;
     private final long timeout;
@@ -194,7 +195,7 @@ public class TSServiceImpl implements TSIService.Iface {
     public QueryTask(
         PhysicalPlan plan,
         long queryStartTime,
-        long sessionId,
+        IClientSession session,
         String statement,
         long statementId,
         long timeout,
@@ -203,7 +204,7 @@ public class TSServiceImpl implements TSIService.Iface {
         boolean enableRedirectQuery) {
       this.plan = plan;
       this.queryStartTime = queryStartTime;
-      this.sessionId = sessionId;
+      this.session = session;
       this.statement = statement;
       this.statementId = statementId;
       this.timeout = timeout;
@@ -214,11 +215,11 @@ public class TSServiceImpl implements TSIService.Iface {
 
     @Override
     public TSExecuteStatementResp call() throws Exception {
-      String username = SESSION_MANAGER.getUsername(sessionId);
+      String username = session.getUsername();
       plan.setLoginUserName(username);
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
-      AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement);
+      AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement);
 
       final long queryId = SESSION_MANAGER.requestQueryId(statementId, true);
       QueryContext context =
@@ -252,13 +253,13 @@ public class TSServiceImpl implements TSIService.Iface {
 
   protected class FetchResultsTask implements Callable<TSFetchResultsResp> {
 
-    private final long sessionId;
+    private final IClientSession session;
     private final long queryId;
     private final int fetchSize;
     private final boolean isAlign;
 
-    public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) {
-      this.sessionId = sessionId;
+    public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) {
+      this.session = session;
       this.queryId = queryId;
       this.fetchSize = fetchSize;
       this.isAlign = isAlign;
@@ -270,8 +271,7 @@ public class TSServiceImpl implements TSIService.Iface {
       TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try {
         if (isAlign) {
-          TSQueryDataSet result =
-              fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+          TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = result.bufferForTime().limit() != 0;
           if (!hasResultSet) {
             SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
@@ -281,8 +281,7 @@ public class TSServiceImpl implements TSIService.Iface {
           resp.setIsAlign(true);
         } else {
           TSQueryNonAlignDataSet nonAlignResult =
-              fillRpcNonAlignReturnData(
-                  fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId));
+              fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername());
           boolean hasResultSet = false;
           for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
             if (timeBuffer.limit() != 0) {
@@ -323,8 +322,13 @@ public class TSServiceImpl implements TSIService.Iface {
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
     BasicOpenSessionResp openSessionResp =
-        serviceProvider.openSession(
-            req.username, req.password, req.zoneId, req.client_protocol, clientVersion);
+        serviceProvider.login(
+            SESSION_MANAGER.getCurrSession(),
+            req.username,
+            req.password,
+            req.zoneId,
+            req.client_protocol,
+            clientVersion);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage());
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
@@ -341,7 +345,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
-        !serviceProvider.closeSession(req.sessionId)
+        !serviceProvider.closeSession(SESSION_MANAGER.getCurrSession())
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -355,13 +359,18 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus closeOperation(TSCloseOperationReq req) {
     return serviceProvider.closeOperation(
-        req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId());
+        SESSION_MANAGER.getCurrSession(),
+        req.queryId,
+        req.statementId,
+        req.isSetStatementId(),
+        req.isSetQueryId());
   }
 
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     TSFetchMetadataResp resp = new TSFetchMetadataResp();
-    TSStatus status = checkLoginStatus(req.getSessionId());
+    TSStatus status = checkLoginStatus(session);
     if (isStatusNotSuccess(status)) {
       return resp.setStatus(status);
     }
@@ -503,10 +512,11 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     List<TSStatus> result = new ArrayList<>();
     boolean isAllSuccessful = true;
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
@@ -522,10 +532,7 @@ public class TSServiceImpl implements TSIService.Iface {
         PhysicalPlan physicalPlan =
             serviceProvider
                 .getPlanner()
-                .parseSQLToPhysicalPlan(
-                    statement,
-                    SESSION_MANAGER.getZoneId(req.sessionId),
-                    SESSION_MANAGER.getClientVersion(req.sessionId));
+                .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
         if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -539,7 +546,7 @@ public class TSServiceImpl implements TSIService.Iface {
             index = 0;
           }
 
-          TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
           if (status != null) {
             insertRowsPlan.getResults().put(index, status);
             isAllSuccessful = false;
@@ -561,7 +568,7 @@ public class TSServiceImpl implements TSIService.Iface {
             multiPlan = new CreateMultiTimeSeriesPlan();
             executeList.add(multiPlan);
           }
-          TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+          TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
           if (status != null) {
             multiPlan.getResults().put(i, status);
             isAllSuccessful = false;
@@ -586,7 +593,7 @@ public class TSServiceImpl implements TSIService.Iface {
             executeList.clear();
           }
           long t2 = System.currentTimeMillis();
-          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId());
+          TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, session);
           addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
           result.add(resp.status);
           if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -611,32 +618,24 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     String statement = req.getStatement();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
-
       long startTime = System.currentTimeMillis();
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getZoneId(req.getSessionId()),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
+        return submitQueryTask(session, physicalPlan, startTime, req);
       } else {
         return executeUpdateStatement(
-            statement,
-            req.statementId,
-            physicalPlan,
-            req.fetchSize,
-            req.timeout,
-            req.getSessionId());
+            session, statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
       }
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -651,8 +650,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -661,13 +661,10 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .parseSQLToPhysicalPlan(
-                  statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
-        return submitQueryTask(physicalPlan, startTime, req);
+        return submitQueryTask(session, physicalPlan, startTime, req);
       } else {
         return RpcUtils.getTSExecuteStatementResp(
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -687,8 +684,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -696,10 +694,7 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .rawDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .rawDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -708,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        session,
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -733,8 +728,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSExecuteStatementResp(loginStatus);
       }
@@ -742,10 +738,7 @@ public class TSServiceImpl implements TSIService.Iface {
       PhysicalPlan physicalPlan =
           serviceProvider
               .getPlanner()
-              .lastDataQueryReqToPhysicalPlan(
-                  req,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+              .lastDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion());
 
       if (physicalPlan.isQuery()) {
         Future<TSExecuteStatementResp> resp =
@@ -754,7 +747,7 @@ public class TSServiceImpl implements TSIService.Iface {
                     new QueryTask(
                         physicalPlan,
                         startTime,
-                        req.sessionId,
+                        session,
                         "",
                         req.statementId,
                         CONFIG.getQueryTimeoutThreshold(),
@@ -778,8 +771,9 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSExecuteStatementResp submitQueryTask(
-      PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception {
-    TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId());
+      IClientSession session, PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req)
+      throws Exception {
+    TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -788,7 +782,7 @@ public class TSServiceImpl implements TSIService.Iface {
         new QueryTask(
             physicalPlan,
             startTime,
-            req.sessionId,
+            session,
             req.statement,
             req.statementId,
             req.timeout,
@@ -929,15 +923,15 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSExecuteStatementResp executeSelectIntoStatement(
+      IClientSession session,
       String statement,
       long statementId,
       PhysicalPlan physicalPlan,
       int fetchSize,
-      long timeout,
-      long sessionId)
+      long timeout)
       throws IoTDBException, TException, SQLException, IOException, InterruptedException,
           QueryFilterOptimizationException {
-    TSStatus status = serviceProvider.checkAuthority(physicalPlan, sessionId);
+    TSStatus status = serviceProvider.checkAuthority(physicalPlan, session);
     if (status != null) {
       return new TSExecuteStatementResp(status);
     }
@@ -951,8 +945,7 @@ public class TSServiceImpl implements TSIService.Iface {
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
     QUERY_FREQUENCY_RECORDER.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement);
+    AUDIT_LOGGER.debug("Session {} execute select into: {}", session, statement);
     if (queryPlan.isEnableTracing()) {
       TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size());
     }
@@ -970,7 +963,7 @@ public class TSServiceImpl implements TSIService.Iface {
         if (insertTabletPlans.isEmpty()) {
           continue;
         }
-        TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId);
+        TSStatus executionStatus = insertTabletsInternally(session, insertTabletPlans);
         if (isStatusNotSuccess(executionStatus)
             && executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
           return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId);
@@ -989,11 +982,11 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   private TSStatus insertTabletsInternally(
-      List<InsertTabletPlan> insertTabletPlans, long sessionId) {
+      IClientSession session, List<InsertTabletPlan> insertTabletPlans) {
     InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
     for (int i = 0; i < insertTabletPlans.size(); i++) {
       InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
 
       if (status != null) {
         // not authorized
@@ -1009,7 +1002,8 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      IClientSession session = SESSION_MANAGER.getCurrSession();
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return RpcUtils.getTSFetchResultsResp(loginStatus);
       }
@@ -1020,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       Future<TSFetchResultsResp> resp =
           QueryTaskManager.getInstance()
-              .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign));
+              .submit(new FetchResultsTask(session, req.queryId, req.fetchSize, req.isAlign));
       return resp.get();
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
@@ -1075,7 +1069,8 @@ public class TSServiceImpl implements TSIService.Iface {
   /** update statement can be: 1. select-into statement 2. non-query statement */
   @Override
   public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return RpcUtils.getTSExecuteStatementResp(loginStatus);
     }
@@ -1084,19 +1079,12 @@ public class TSServiceImpl implements TSIService.Iface {
           serviceProvider
               .getPlanner()
               .parseSQLToPhysicalPlan(
-                  req.statement,
-                  SESSION_MANAGER.getZoneId(req.sessionId),
-                  SESSION_MANAGER.getClientVersion(req.sessionId));
+                  req.statement, session.getZoneId(), session.getClientVersion());
       return physicalPlan.isQuery()
           ? RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
           : executeUpdateStatement(
-              req.statement,
-              req.statementId,
-              physicalPlan,
-              req.fetchSize,
-              req.timeout,
-              req.getSessionId());
+              session, req.statement, req.statementId, physicalPlan, req.fetchSize, req.timeout);
     } catch (InterruptedException e) {
       LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
       Thread.currentThread().interrupt();
@@ -1112,21 +1100,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   /** update statement can be: 1. select-into statement 2. non-query statement */
   private TSExecuteStatementResp executeUpdateStatement(
+      IClientSession session,
       String statement,
       long statementId,
       PhysicalPlan plan,
       int fetchSize,
-      long timeout,
-      long sessionId)
+      long timeout)
       throws TException, SQLException, IoTDBException, IOException, InterruptedException,
           QueryFilterOptimizationException {
     return plan.isSelectInto()
-        ? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId)
-        : executeNonQueryStatement(plan, sessionId);
+        ? executeSelectIntoStatement(session, statement, statementId, plan, fetchSize, timeout)
+        : executeNonQueryStatement(plan, session);
   }
 
-  private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) {
-    TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+  private TSExecuteStatementResp executeNonQueryStatement(
+      PhysicalPlan plan, IClientSession session) {
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null
         ? new TSExecuteStatementResp(status)
         : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan))
@@ -1134,9 +1123,9 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   public void handleClientExit() {
-    Long sessionId = SESSION_MANAGER.getCurrSessionId();
-    if (sessionId != null) {
-      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    if (session != null) {
+      TSCloseSessionReq req = new TSCloseSessionReq();
       closeSession(req);
     }
   }
@@ -1151,7 +1140,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId);
+      ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId();
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1166,7 +1155,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone);
+      SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone));
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(
@@ -1202,14 +1191,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecords(TSInsertRecordsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1224,7 +1214,7 @@ public class TSServiceImpl implements TSIService.Iface {
                 req.getMeasurementsList().get(i).toArray(new String[0]),
                 req.valuesList.get(i),
                 req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
           allCheckSuccess = false;
@@ -1254,14 +1244,14 @@ public class TSServiceImpl implements TSIService.Iface {
   /**
    * Checking the Login Status.
    *
-   * @param sessionId Session id.
+   * @param session client session.
    * @return When not login or session timeout, will return error status.
    */
-  private TSStatus checkLoginStatus(long sessionId) {
-    if (!serviceProvider.checkLogin(sessionId)) {
+  private TSStatus checkLoginStatus(IClientSession session) {
+    if (!serviceProvider.checkLogin(session)) {
       return getNotLoggedInStatus();
     }
-    if (serviceProvider.checkSessionTimeout(sessionId)) {
+    if (serviceProvider.checkSessionTimeout(session)) {
       return getSessionTimeoutStatus();
     }
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1290,14 +1280,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1311,7 +1302,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.getMeasurementsList(),
               req.getValuesList(),
               req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       statusList.add(status != null ? status : executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
       statusList.add(
@@ -1336,14 +1327,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPath,
           req.getTimestamps().get(0));
     }
@@ -1359,7 +1351,7 @@ public class TSServiceImpl implements TSIService.Iface {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1394,14 +1386,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.prefixPaths.get(0),
           req.getTimestamps().get(0));
     }
@@ -1417,7 +1410,7 @@ public class TSServiceImpl implements TSIService.Iface {
         plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
         plan.setNeedInferType(true);
         plan.setAligned(req.isAligned);
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
 
         if (status != null) {
           insertRowsPlan.getResults().put(i, status);
@@ -1507,14 +1500,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertRecord(TSInsertRecordReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1525,7 +1519,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.getMeasurements().toArray(new String[0]),
               req.values,
               req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       if (status != null) {
         return status;
@@ -1542,14 +1536,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTimestamp());
 
@@ -1561,7 +1556,7 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       if (status != null) {
         return status;
@@ -1578,8 +1573,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteData(TSDeleteDataReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1591,7 +1587,7 @@ public class TSServiceImpl implements TSIService.Iface {
         paths.add(new PartialPath(path));
       }
       plan.addPaths(paths);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan));
     } catch (IoTDBException e) {
@@ -1604,9 +1600,10 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertTablet(TSInsertTabletReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1621,7 +1618,7 @@ public class TSServiceImpl implements TSIService.Iface {
       insertTabletPlan.setRowCount(req.size);
       insertTabletPlan.setDataTypes(req.types);
       insertTabletPlan.setAligned(req.isAligned);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
 
       if (status != null) {
         return status;
@@ -1640,13 +1637,14 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     long t1 = System.currentTimeMillis();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
-      return insertTabletsInternally(req);
+      return insertTabletsInternally(session, req);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode());
     } catch (NullPointerException e) {
@@ -1662,6 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
       throws IllegalPathException {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     InsertTabletPlan insertTabletPlan =
         new InsertTabletPlan(new PartialPath(req.prefixPaths.get(i)), req.measurementsList.get(i));
     insertTabletPlan.setTimes(
@@ -1682,12 +1681,13 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   /** construct one InsertMultiTabletPlan and process it */
-  public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws IllegalPathException {
+  private TSStatus insertTabletsInternally(IClientSession session, TSInsertTabletsReq req)
+      throws IllegalPathException {
     List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
     InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
     for (int i = 0; i < req.prefixPaths.size(); i++) {
       InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
-      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session);
       if (status != null) {
         // not authorized
         insertMultiTabletPlan.getResults().put(i, status);
@@ -1701,14 +1701,15 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
 
       SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup));
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
@@ -1721,8 +1722,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1732,7 +1734,7 @@ public class TSServiceImpl implements TSIService.Iface {
         storageGroupList.add(new PartialPath(storageGroup));
       }
       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList);
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode());
@@ -1744,14 +1746,14 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+        AUDIT_LOGGER.debug("Session-{} create timeseries {}", session, req.getPath());
       }
 
       CreateTimeSeriesPlan plan =
@@ -1764,7 +1766,7 @@ public class TSServiceImpl implements TSIService.Iface {
               req.tags,
               req.attributes,
               req.measurementAlias);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode());
@@ -1776,8 +1778,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1785,7 +1788,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create aligned timeseries {}.{}",
-            SESSION_MANAGER.getCurrSessionId(),
+            session,
             req.getPrefixPath(),
             req.getMeasurements());
       }
@@ -1811,7 +1814,7 @@ public class TSServiceImpl implements TSIService.Iface {
               encodings,
               compressors,
               req.measurementAlias);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode());
@@ -1824,15 +1827,16 @@ public class TSServiceImpl implements TSIService.Iface {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            SESSION_MANAGER.getCurrSessionId(),
+            session,
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1863,7 +1867,7 @@ public class TSServiceImpl implements TSIService.Iface {
       CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan();
       for (int i = 0; i < req.paths.size(); i++) {
         plan.setPath(new PartialPath(req.paths.get(i)));
-        TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+        TSStatus status = serviceProvider.checkAuthority(plan, session);
         if (status != null) {
           // not authorized
           multiPlan.getResults().put(i, status);
@@ -1911,8 +1915,9 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus deleteTimeseries(long sessionId, List<String> paths) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(sessionId);
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
@@ -1922,7 +1927,7 @@ public class TSServiceImpl implements TSIService.Iface {
         pathList.add(new PartialPath(path));
       }
       DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode());
@@ -1934,28 +1939,27 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public long requestStatementId(long sessionId) {
-    return SESSION_MANAGER.requestStatementId(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    return SESSION_MANAGER.requestStatementId(session);
   }
 
   @Override
   public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     try {
-      TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+      TSStatus loginStatus = checkLoginStatus(session);
       if (isStatusNotSuccess(loginStatus)) {
         return loginStatus;
       }
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug(
-            "Session-{} create schema template {}",
-            SESSION_MANAGER.getCurrSessionId(),
-            req.getName());
+        AUDIT_LOGGER.debug("Session-{} create schema template {}", session, req.getName());
       }
 
       CreateTemplatePlan plan;
       // Construct plan from serialized request
       ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate());
       plan = CreateTemplatePlan.deserializeFromReq(buffer);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
 
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (Exception e) {
@@ -1966,6 +1970,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     int size = req.getMeasurementsSize();
     String[] measurements = new String[size];
     TSDataType[] dataTypes = new TSDataType[size];
@@ -1982,20 +1987,22 @@ public class TSServiceImpl implements TSIService.Iface {
     AppendTemplatePlan plan =
         new AppendTemplatePlan(
             req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes);
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
   @Override
   public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     PruneTemplatePlan plan =
         new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
   @Override
   public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     TSQueryTemplateResp resp = new TSQueryTemplateResp();
     try {
       String path;
@@ -2044,21 +2051,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getTemplateName(),
           req.getPrefixPath());
     }
 
     try {
       SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2067,21 +2075,22 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} unset schema template {}.{}",
-          SESSION_MANAGER.getCurrSessionId(),
+          session,
           req.getPrefixPath(),
           req.getTemplateName());
     }
 
     try {
       UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName);
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2091,23 +2100,21 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus unsetUsingTemplate(long sessionId, String templateName, String prefixPath)
       throws TException {
-    TSStatus loginStatus = checkLoginStatus(sessionId);
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
 
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "Session-{} unset using schema template {} on {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          templateName,
-          prefixPath);
+          "Session-{} unset using schema template {} on {}", session, templateName, prefixPath);
     }
 
     try {
       DeactivateTemplatePlan plan =
           new DeactivateTemplatePlan(templateName, new PartialPath(prefixPath));
-      TSStatus status = serviceProvider.checkAuthority(plan, sessionId);
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (MetadataException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2116,20 +2123,19 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus setUsingTemplate(TSSetUsingTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
-          "Session-{} create timeseries of schema template on path {}",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getDstPath());
+          "Session-{} create timeseries of schema template on path {}", session, req.getDstPath());
     }
 
     try {
       ActivateTemplatePlan plan = new ActivateTemplatePlan(new PartialPath(req.getDstPath()));
-      TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+      TSStatus status = serviceProvider.checkAuthority(plan, session);
       return status != null ? status : executeNonQueryPlan(plan);
     } catch (IllegalPathException e) {
       return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode());
@@ -2138,19 +2144,17 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException {
-    TSStatus loginStatus = checkLoginStatus(req.getSessionId());
+    IClientSession session = SESSION_MANAGER.getCurrSession();
+    TSStatus loginStatus = checkLoginStatus(session);
     if (isStatusNotSuccess(loginStatus)) {
       return loginStatus;
     }
     if (AUDIT_LOGGER.isDebugEnabled()) {
-      AUDIT_LOGGER.debug(
-          "Session-{} drop schema template {}.",
-          SESSION_MANAGER.getCurrSessionId(),
-          req.getTemplateName());
+      AUDIT_LOGGER.debug("Session-{} drop schema template {}.", session, req.getTemplateName());
     }
 
     DropTemplatePlan plan = new DropTemplatePlan(req.templateName);
-    TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
+    TSStatus status = serviceProvider.checkAuthority(plan, session);
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
@@ -2164,6 +2168,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+    IClientSession session = SESSION_MANAGER.getCurrSession();
     PhysicalPlan physicalPlan;
     try {
       ByteBuffer planBuffer = req.physicalPlan;


[iotdb] 01/02: update jdbc and client-py's version to release 0.13.3 (#7687)

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature-client-session-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 198068730d56c059c791960c99a00a386d10760e
Author: 张正明 <87...@qq.com>
AuthorDate: Fri Oct 21 16:53:18 2022 +0800

    update jdbc and client-py's version to release 0.13.3 (#7687)
---
 client-py/setup.py                | 2 +-
 jdbc/src/main/feature/feature.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/client-py/setup.py b/client-py/setup.py
index ed0db4a570..eae6136370 100644
--- a/client-py/setup.py
+++ b/client-py/setup.py
@@ -31,7 +31,7 @@ print(long_description)
 
 setuptools.setup(
     name="apache-iotdb",  # Replace with your own username
-    version="0.13.2",
+    version="0.13.3",
     author=" Apache Software Foundation",
     author_email="dev@iotdb.apache.org",
     description="Apache IoTDB client API",
diff --git a/jdbc/src/main/feature/feature.xml b/jdbc/src/main/feature/feature.xml
index 312143069f..f65acaad60 100644
--- a/jdbc/src/main/feature/feature.xml
+++ b/jdbc/src/main/feature/feature.xml
@@ -18,7 +18,7 @@
 
 -->
 <features xmlns="http://karaf.apache.org/xmlns/features/v1.5.0" name="driver-s7-feature">
-    <feature name="iotdb-feature" description="iotdb-feature" version="0.13.2">
+    <feature name="iotdb-feature" description="iotdb-feature" version="0.13.3">
         <details>Feature to install required Bundle to use IoTDB inside Karaf container</details>
         <feature prerequisite="true">wrap</feature>
         <feature>scr</feature>