You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/27 10:56:42 UTC

[iotdb] 01/01: [Draft] python cache leader

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d21dccfb531e30069132249fb41d0c6dcf507942
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Mar 27 18:52:38 2023 +0800

    [Draft] python cache leader
---
 client-py/iotdb/Session.py | 383 ++++++++++++++++++++++++++++++---------------
 1 file changed, 261 insertions(+), 122 deletions(-)

diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..ac2e7a6f7f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -91,6 +91,7 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         self.__host = host
         self.__port = port
@@ -101,13 +102,16 @@ class Session(object):
         self.__password = password
         self.__fetch_size = fetch_size
         self.__is_close = True
-        self.__transport = None
-        self.__client = None
+        self.__default_client = None
+        self.__default_connection = None
         self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
         self.__enable_rpc_compression = None
+        self.__enable_redirection = enable_redirection
+        self.__device_id_to_endpoint = None
+        self.__endpoint_to_connection = None
 
     @classmethod
     def init_from_node_urls(
@@ -117,10 +121,13 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         if node_urls is None:
             raise RuntimeError("node urls is empty")
-        session = Session(None, None, user, password, fetch_size, zone_id)
+        session = Session(
+            None, None, user, password, fetch_size, zone_id, enable_redirection
+        )
         session.__hosts = []
         session.__ports = []
         for node_url in node_urls:
@@ -137,34 +144,45 @@ class Session(object):
             return
         self.__enable_rpc_compression = enable_rpc_compression
         if self.__hosts is None:
-            self.init_connection(self.__default_endpoint)
+            self.__default_connection = self.init_connection(self.__default_endpoint)
+            self.__default_client = self.__default_connection.client
         else:
             for i in range(0, len(self.__hosts)):
                 self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
                 try:
-                    self.init_connection(self.__default_endpoint)
+                    self.__default_connection = self.init_connection(
+                        self.__default_endpoint
+                    )
+                    self.__default_client = self.__default_connection.client
                 except Exception as e:
                     if not self.reconnect():
                         logger.error("Cluster has no nodes to connect")
                         raise e
                 break
+        self.__is_close = False
+        if self.__enable_redirection:
+            self.__device_id_to_endpoint = {}
+            self.__endpoint_to_connection = {
+                self.__default_endpoint: self.__default_connection
+            }
 
     def init_connection(self, endpoint):
-        self.__transport = TTransport.TFramedTransport(
+        transport = TTransport.TFramedTransport(
             TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
-        if not self.__transport.isOpen():
+        if not transport.isOpen():
             try:
-                self.__transport.open()
+                transport.open()
             except TTransport.TTransportException as e:
                 raise e
 
         if self.__enable_rpc_compression:
-            self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+            client = Client(TCompactProtocol.TCompactProtocol(transport))
         else:
-            self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+            client = Client(TBinaryProtocol.TBinaryProtocol(transport))
 
+        connection = SessionConnection(client, transport)
         open_req = TSOpenSessionReq(
             client_protocol=self.protocol_version,
             username=self.__user,
@@ -174,7 +192,8 @@ class Session(object):
         )
 
         try:
-            open_resp = self.__client.openSession(open_req)
+            open_resp = client.openSession(open_req)
+            Session.verify_success(open_resp.status)
 
             if self.protocol_version != open_resp.serverProtocolVersion:
                 logger.exception(
@@ -187,19 +206,23 @@ class Session(object):
                     raise TTransport.TException(message="Protocol not supported.")
 
             self.__session_id = open_resp.sessionId
-            self.__statement_id = self.__client.requestStatementId(self.__session_id)
+            self.__statement_id = client.requestStatementId(self.__session_id)
 
         except Exception as e:
-            self.__transport.close()
+            transport.close()
             logger.exception("session closed because: ", exc_info=e)
             raise e
 
         if self.__zone_id is not None:
-            self.set_time_zone(self.__zone_id)
+            request = TSSetTimeZoneReq(self.__session_id, self.__zone_id)
+            try:
+                client.setTimeZone(request)
+            except TTransport.TException as e:
+                raise RuntimeError("Could not set time zone because: ", e)
         else:
             self.__zone_id = self.get_time_zone()
 
-        self.__is_close = False
+        return connection
 
     def is_open(self):
         return not self.__is_close
@@ -209,16 +232,13 @@ class Session(object):
             return
         req = TSCloseSessionReq(self.__session_id)
         try:
-            self.__client.closeSession(req)
-        except TTransport.TException as e:
-            logger.exception(
-                "Error occurs when closing session at server. Maybe server is down. Error message: ",
-                exc_info=e,
-            )
+            if self.__enable_redirection:
+                for connection in self.__endpoint_to_connection.values():
+                    connection.close_connection(req)
+            else:
+                self.__default_connection.close_connection(req)
         finally:
             self.__is_close = True
-            if self.__transport is not None:
-                self.__transport.close()
 
     def set_storage_group(self, group_name):
         """
@@ -227,13 +247,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.setStorageGroup(self.__session_id, group_name)
+                self.__default_client.setStorageGroup(self.__session_id, group_name)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.setStorageGroup(self.__session_id, group_name)
+                        self.__default_client.setStorageGroup(
+                            self.__session_id, group_name
+                        )
                     )
                 except TTransport.TException as e1:
                     logger.exception("create databases fails because: ", e1)
@@ -256,13 +278,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+                self.__default_client.deleteStorageGroups(
+                    self.__session_id, storage_group_lst
+                )
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.deleteStorageGroups(
+                        self.__default_client.deleteStorageGroups(
                             self.__session_id, storage_group_lst
                         )
                     )
@@ -309,13 +333,15 @@ class Session(object):
             alias,
         )
         try:
-            return Session.verify_success(self.__client.createTimeseries(request))
+            return Session.verify_success(
+                self.__default_client.createTimeseries(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createTimeseries(request)
+                        self.__default_client.createTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating time series fails because: ", e1)
@@ -348,14 +374,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.createAlignedTimeseries(request)
+                self.__default_client.createAlignedTimeseries(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createAlignedTimeseries(request)
+                        self.__default_client.createAlignedTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating time series fails because: ", e1)
@@ -400,13 +426,15 @@ class Session(object):
             alias_lst,
         )
         try:
-            return Session.verify_success(self.__client.createMultiTimeseries(request))
+            return Session.verify_success(
+                self.__default_client.createMultiTimeseries(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createMultiTimeseries(request)
+                        self.__default_client.createMultiTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating multi time series fails because: ", e1)
@@ -421,13 +449,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.deleteTimeseries(self.__session_id, paths_list)
+                self.__default_client.deleteTimeseries(self.__session_id, paths_list)
             )
         except TTransport.TException:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                        self.__default_client.deleteTimeseries(
+                            self.__session_id, paths_list
+                        )
                     )
                 except TTransport.TException as e1:
                     logger.exception("deleting time series fails because: ", e1)
@@ -454,12 +484,14 @@ class Session(object):
             self.__session_id, paths_list, -9223372036854775808, end_time
         )
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return Session.verify_success(self.__default_client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.deleteData(request))
+                    return Session.verify_success(
+                        self.__default_client.deleteData(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("data deletion fails because: ", e1)
                     raise e1
@@ -475,12 +507,14 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return Session.verify_success(self.__default_client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.deleteData(request))
+                    return Session.verify_success(
+                        self.__default_client.deleteData(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("data deletion fails because: ", e1)
                     raise e1
@@ -497,13 +531,15 @@ class Session(object):
             device_id, timestamp, measurements, string_values
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertStringRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecord(request)
+                        self.__default_client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -523,13 +559,15 @@ class Session(object):
             device_id, timestamp, measurements, string_values, True
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertStringRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecord(request)
+                        self.__default_client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -554,12 +592,16 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecord(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecord(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -586,12 +628,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
         try:
-            return Session.verify_success(self.__client.insertRecords(request))
+            return Session.verify_success(self.__default_client.insertRecords(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecords(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -617,12 +661,16 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values, True
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecord(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecord(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -649,12 +697,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
         try:
-            return Session.verify_success(self.__client.insertRecords(request))
+            return Session.verify_success(self.__default_client.insertRecords(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecords(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -678,12 +728,14 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return Session.verify_success(self.__client.testInsertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).testInsertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.testInsertRecord(request)
+                        self.__default_client.testInsertRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -711,12 +763,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
         try:
-            return Session.verify_success(self.__client.testInsertRecords(request))
+            return Session.verify_success(
+                self.__default_client.testInsertRecords(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.testInsertRecords(request)
+                        self.__default_client.testInsertRecords(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -805,12 +859,16 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).insertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablet(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablet(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -824,12 +882,14 @@ class Session(object):
         """
         request = self.gen_insert_tablets_req(tablet_lst)
         try:
-            return Session.verify_success(self.__client.insertTablets(request))
+            return Session.verify_success(self.__default_client.insertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablets(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -850,12 +910,16 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet, True)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).insertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablet(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablet(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -869,12 +933,14 @@ class Session(object):
         """
         request = self.gen_insert_tablets_req(tablet_lst, True)
         try:
-            return Session.verify_success(self.__client.insertTablets(request))
+            return Session.verify_success(self.__default_client.insertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablets(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -934,14 +1000,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+                self.get_client(device_id).insertRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertRecordsOfOneDevice(request)
+                        self.__default_client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1003,14 +1069,14 @@ class Session(object):
         # send request
         try:
             return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+                self.get_client(device_id).insertRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertRecordsOfOneDevice(request)
+                        self.__default_client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1054,15 +1120,17 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
+        request = self.gen_insert_tablet_req(tablet)
         try:
-            request = self.gen_insert_tablet_req(tablet)
-            return Session.verify_success(self.__client.testInsertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).testInsertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.testInsertTablet(request)
+                        self.__default_client.testInsertTablet(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -1076,14 +1144,17 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
+        request = self.gen_insert_tablets_req(tablet_list)
         try:
-            request = self.gen_insert_tablets_req(tablet_list)
-            return Session.verify_success(self.__client.testInsertTablets(request))
+            return Session.verify_success(
+                self.__default_client.testInsertTablets(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
+                    request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.testInsertTablets(request)
+                        self.__default_client.testInsertTablets(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -1142,13 +1213,13 @@ class Session(object):
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
         try:
-            resp = self.__client.executeQueryStatement(request)
+            resp = self.__default_client.executeQueryStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeQueryStatement(request)
+                    resp = self.__default_client.executeQueryStatement(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1161,7 +1232,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1175,13 +1246,13 @@ class Session(object):
         """
         request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
         try:
-            resp = self.__client.executeUpdateStatement(request)
+            resp = self.__default_client.executeUpdateStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeUpdateStatement(request)
+                    resp = self.__default_client.executeUpdateStatement(request)
                 except TTransport.TException as e1:
                     logger.exception(
                         "execution of non-query statement fails because: ", e1
@@ -1196,13 +1267,13 @@ class Session(object):
             self.__session_id, sql, self.__statement_id, timeout
         )
         try:
-            resp = self.__client.executeStatement(request)
+            resp = self.__default_client.executeStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeStatement(request)
+                    resp = self.__default_client.executeStatement(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of statement fails because: ", e1)
                     raise e1
@@ -1216,7 +1287,7 @@ class Session(object):
                 resp.dataTypeList,
                 resp.columnNameIndexMap,
                 resp.queryId,
-                self.__client,
+                self.__default_client,
                 self.__statement_id,
                 self.__session_id,
                 resp.queryDataSet,
@@ -1276,7 +1347,7 @@ class Session(object):
         if self.__zone_id is not None:
             return self.__zone_id
         try:
-            resp = self.__client.getTimeZone(self.__session_id)
+            resp = self.__default_client.getTimeZone(self.__session_id)
         except TTransport.TException as e:
             raise RuntimeError("Could not get time zone because: ", e)
         return resp.timeZone
@@ -1284,7 +1355,7 @@ class Session(object):
     def set_time_zone(self, zone_id):
         request = TSSetTimeZoneReq(self.__session_id, zone_id)
         try:
-            status = self.__client.setTimeZone(request)
+            status = self.__default_client.setTimeZone(request)
             logger.debug(
                 "setting time zone_id as {}, message: {}".format(
                     zone_id, status.message
@@ -1354,13 +1425,13 @@ class Session(object):
             enableRedirectQuery=False,
         )
         try:
-            resp = self.__client.executeRawDataQuery(request)
+            resp = self.__default_client.executeRawDataQuery(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeRawDataQuery(request)
+                    resp = self.__default_client.executeRawDataQuery(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1373,7 +1444,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1396,13 +1467,13 @@ class Session(object):
             enableRedirectQuery=False,
         )
         try:
-            resp = self.__client.executeLastDataQuery(request)
+            resp = self.__default_client.executeLastDataQuery(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeLastDataQuery(request)
+                    resp = self.__default_client.executeLastDataQuery(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1415,7 +1486,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1449,14 +1520,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+                self.get_client(device_id).insertStringRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecordsOfOneDevice(request)
+                        self.__default_client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1481,14 +1552,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+                self.get_client(device_id).insertStringRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecordsOfOneDevice(request)
+                        self.__default_client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1501,8 +1572,8 @@ class Session(object):
             return False
         connected = False
         for i in range(1, self.RETRY_NUM + 1):
-            if self.__transport is not None:
-                self.__transport.close()
+            if self.__default_connection.transport is not None:
+                self.__default_connection.transport.close()
                 curr_host_index = random.randint(0, len(self.__hosts))
                 try_host_num = 0
                 for j in range(curr_host_index, len(self.__hosts)):
@@ -1515,7 +1586,10 @@ class Session(object):
                         j = -1
                     try_host_num += 1
                     try:
-                        self.init_connection(self.__default_endpoint)
+                        self.__default_connection = self.init_connection(
+                            self.__default_endpoint
+                        )
+                        self.__default_client = self.__default_connection.client
                         connected = True
                     except TTransport.TException as e:
                         continue
@@ -1524,6 +1598,37 @@ class Session(object):
                 break
         return connected
 
+    def get_client(self, device_id):
+        if (
+            self.__enable_redirection
+            and len(self.__device_id_to_endpoint) != 0
+            and self.__device_id_to_endpoint.has_key(device_id)
+        ):
+            endpoint = self.__device_id_to_endpoint.get(device_id)
+            if self.__endpoint_to_connection.has_key(endpoint):
+                return self.__endpoint_to_connection.get(endpoint).client
+        return self.__default_client
+
+    def handle_redirection(self, device_id, endpoint):
+        if self.__enable_redirection:
+            if endpoint.ip is "0.0.0.0":
+                return
+            if (
+                not self.__device_id_to_endpoint.has_key(device_id)
+                or self.__device_id_to_endpoint.get(device_id) != endpoint
+            ):
+                self.__device_id_to_endpoint[device_id] = endpoint
+            if self.__endpoint_to_connection.has_key(endpoint):
+                connection = self.__endpoint_to_connection[endpoint]
+            else:
+                try:
+                    connection = self.init_connection(endpoint)
+                except Exception:
+                    connection = None
+                self.__endpoint_to_connection[endpoint] = connection
+            if connection is None:
+                self.__device_id_to_endpoint.pop(device_id)
+
     def gen_insert_string_records_of_one_device_request(
         self,
         device_id,
@@ -1562,13 +1667,15 @@ class Session(object):
             self.__session_id, template.get_name(), bytes_array
         )
         try:
-            return Session.verify_success(self.__client.createSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.createSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createSchemaTemplate(request)
+                        self.__default_client.createSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("create template fails because: ", e1)
@@ -1583,13 +1690,15 @@ class Session(object):
         """
         request = TSDropSchemaTemplateReq(self.__session_id, template_name)
         try:
-            return Session.verify_success(self.__client.dropSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.dropSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.dropSchemaTemplate(request)
+                        self.__default_client.dropSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("drop template fails because: ", e1)
@@ -1625,13 +1734,15 @@ class Session(object):
             list(map(lambda x: x.value, compressors)),
         )
         try:
-            return Session.verify_success(self.__client.appendSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.appendSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.appendSchemaTemplate(request)
+                        self.__default_client.appendSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("append template fails because: ", e1)
@@ -1647,13 +1758,15 @@ class Session(object):
         """
         request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path)
         try:
-            return Session.verify_success(self.__client.pruneSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.pruneSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.pruneSchemaTemplate(request)
+                        self.__default_client.pruneSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("prune template fails because: ", e1)
@@ -1669,13 +1782,15 @@ class Session(object):
         """
         request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path)
         try:
-            return Session.verify_success(self.__client.setSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.setSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.setSchemaTemplate(request)
+                        self.__default_client.setSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("set template fails because: ", e1)
@@ -1694,13 +1809,15 @@ class Session(object):
             self.__session_id, prefix_path, template_name
         )
         try:
-            return Session.verify_success(self.__client.unsetSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.unsetSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.unsetSchemaTemplate(request)
+                        self.__default_client.unsetSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("unset template fails because: ", e1)
@@ -1719,13 +1836,13 @@ class Session(object):
             TemplateQueryType.COUNT_MEASUREMENTS.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             return response.count
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.count
                 except TTransport.TException as e1:
@@ -1749,14 +1866,14 @@ class Session(object):
             path,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
@@ -1778,14 +1895,14 @@ class Session(object):
             self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
@@ -1809,14 +1926,14 @@ class Session(object):
             pattern,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1837,14 +1954,14 @@ class Session(object):
             TemplateQueryType.SHOW_TEMPLATES.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1862,14 +1979,14 @@ class Session(object):
             self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1889,14 +2006,14 @@ class Session(object):
             TemplateQueryType.SHOW_USING_TEMPLATES.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1904,3 +2021,25 @@ class Session(object):
                     raise e1
             else:
                 raise e
+
+
+class SessionConnection(object):
+    def __init__(
+        self,
+        client,
+        transport,
+    ):
+        self.client = client
+        self.transport = transport
+
+    def close_connection(self, req):
+        try:
+            self.client.closeSession(req)
+        except TTransport.TException as e:
+            logger.exception(
+                "Error occurs when closing session at server. Maybe server is down. Error message: ",
+                exc_info=e,
+            )
+        finally:
+            if self.transport is not None:
+                self.transport.close()