You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/27 10:56:41 UTC

[iotdb] branch python_cache_leader created (now d21dccfb53)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d21dccfb53 [Draft] python cache leader

This branch includes the following new commits:

     new d21dccfb53 [Draft] python cache leader

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [Draft] python cache leader

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d21dccfb531e30069132249fb41d0c6dcf507942
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Mar 27 18:52:38 2023 +0800

    [Draft] python cache leader
---
 client-py/iotdb/Session.py | 383 ++++++++++++++++++++++++++++++---------------
 1 file changed, 261 insertions(+), 122 deletions(-)

diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..ac2e7a6f7f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -91,6 +91,7 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         self.__host = host
         self.__port = port
@@ -101,13 +102,16 @@ class Session(object):
         self.__password = password
         self.__fetch_size = fetch_size
         self.__is_close = True
-        self.__transport = None
-        self.__client = None
+        self.__default_client = None
+        self.__default_connection = None
         self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
         self.__enable_rpc_compression = None
+        self.__enable_redirection = enable_redirection
+        self.__device_id_to_endpoint = None
+        self.__endpoint_to_connection = None
 
     @classmethod
     def init_from_node_urls(
@@ -117,10 +121,13 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         if node_urls is None:
             raise RuntimeError("node urls is empty")
-        session = Session(None, None, user, password, fetch_size, zone_id)
+        session = Session(
+            None, None, user, password, fetch_size, zone_id, enable_redirection
+        )
         session.__hosts = []
         session.__ports = []
         for node_url in node_urls:
@@ -137,34 +144,45 @@ class Session(object):
             return
         self.__enable_rpc_compression = enable_rpc_compression
         if self.__hosts is None:
-            self.init_connection(self.__default_endpoint)
+            self.__default_connection = self.init_connection(self.__default_endpoint)
+            self.__default_client = self.__default_connection.client
         else:
             for i in range(0, len(self.__hosts)):
                 self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
                 try:
-                    self.init_connection(self.__default_endpoint)
+                    self.__default_connection = self.init_connection(
+                        self.__default_endpoint
+                    )
+                    self.__default_client = self.__default_connection.client
                 except Exception as e:
                     if not self.reconnect():
                         logger.error("Cluster has no nodes to connect")
                         raise e
                 break
+        self.__is_close = False
+        if self.__enable_redirection:
+            self.__device_id_to_endpoint = {}
+            self.__endpoint_to_connection = {
+                self.__default_endpoint: self.__default_connection
+            }
 
     def init_connection(self, endpoint):
-        self.__transport = TTransport.TFramedTransport(
+        transport = TTransport.TFramedTransport(
             TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
-        if not self.__transport.isOpen():
+        if not transport.isOpen():
             try:
-                self.__transport.open()
+                transport.open()
             except TTransport.TTransportException as e:
                 raise e
 
         if self.__enable_rpc_compression:
-            self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+            client = Client(TCompactProtocol.TCompactProtocol(transport))
         else:
-            self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+            client = Client(TBinaryProtocol.TBinaryProtocol(transport))
 
+        connection = SessionConnection(client, transport)
         open_req = TSOpenSessionReq(
             client_protocol=self.protocol_version,
             username=self.__user,
@@ -174,7 +192,8 @@ class Session(object):
         )
 
         try:
-            open_resp = self.__client.openSession(open_req)
+            open_resp = client.openSession(open_req)
+            Session.verify_success(open_resp.status)
 
             if self.protocol_version != open_resp.serverProtocolVersion:
                 logger.exception(
@@ -187,19 +206,23 @@ class Session(object):
                     raise TTransport.TException(message="Protocol not supported.")
 
             self.__session_id = open_resp.sessionId
-            self.__statement_id = self.__client.requestStatementId(self.__session_id)
+            self.__statement_id = client.requestStatementId(self.__session_id)
 
         except Exception as e:
-            self.__transport.close()
+            transport.close()
             logger.exception("session closed because: ", exc_info=e)
             raise e
 
         if self.__zone_id is not None:
-            self.set_time_zone(self.__zone_id)
+            request = TSSetTimeZoneReq(self.__session_id, self.__zone_id)
+            try:
+                client.setTimeZone(request)
+            except TTransport.TException as e:
+                raise RuntimeError("Could not set time zone because: ", e)
         else:
             self.__zone_id = self.get_time_zone()
 
-        self.__is_close = False
+        return connection
 
     def is_open(self):
         return not self.__is_close
@@ -209,16 +232,13 @@ class Session(object):
             return
         req = TSCloseSessionReq(self.__session_id)
         try:
-            self.__client.closeSession(req)
-        except TTransport.TException as e:
-            logger.exception(
-                "Error occurs when closing session at server. Maybe server is down. Error message: ",
-                exc_info=e,
-            )
+            if self.__enable_redirection:
+                for connection in self.__endpoint_to_connection.values():
+                    connection.close_connection(req)
+            else:
+                self.__default_connection.close_connection(req)
         finally:
             self.__is_close = True
-            if self.__transport is not None:
-                self.__transport.close()
 
     def set_storage_group(self, group_name):
         """
@@ -227,13 +247,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.setStorageGroup(self.__session_id, group_name)
+                self.__default_client.setStorageGroup(self.__session_id, group_name)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.setStorageGroup(self.__session_id, group_name)
+                        self.__default_client.setStorageGroup(
+                            self.__session_id, group_name
+                        )
                     )
                 except TTransport.TException as e1:
                     logger.exception("create databases fails because: ", e1)
@@ -256,13 +278,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+                self.__default_client.deleteStorageGroups(
+                    self.__session_id, storage_group_lst
+                )
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.deleteStorageGroups(
+                        self.__default_client.deleteStorageGroups(
                             self.__session_id, storage_group_lst
                         )
                     )
@@ -309,13 +333,15 @@ class Session(object):
             alias,
         )
         try:
-            return Session.verify_success(self.__client.createTimeseries(request))
+            return Session.verify_success(
+                self.__default_client.createTimeseries(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createTimeseries(request)
+                        self.__default_client.createTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating time series fails because: ", e1)
@@ -348,14 +374,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.createAlignedTimeseries(request)
+                self.__default_client.createAlignedTimeseries(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createAlignedTimeseries(request)
+                        self.__default_client.createAlignedTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating time series fails because: ", e1)
@@ -400,13 +426,15 @@ class Session(object):
             alias_lst,
         )
         try:
-            return Session.verify_success(self.__client.createMultiTimeseries(request))
+            return Session.verify_success(
+                self.__default_client.createMultiTimeseries(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createMultiTimeseries(request)
+                        self.__default_client.createMultiTimeseries(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("creating multi time series fails because: ", e1)
@@ -421,13 +449,15 @@ class Session(object):
         """
         try:
             return Session.verify_success(
-                self.__client.deleteTimeseries(self.__session_id, paths_list)
+                self.__default_client.deleteTimeseries(self.__session_id, paths_list)
             )
         except TTransport.TException:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                        self.__default_client.deleteTimeseries(
+                            self.__session_id, paths_list
+                        )
                     )
                 except TTransport.TException as e1:
                     logger.exception("deleting time series fails because: ", e1)
@@ -454,12 +484,14 @@ class Session(object):
             self.__session_id, paths_list, -9223372036854775808, end_time
         )
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return Session.verify_success(self.__default_client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.deleteData(request))
+                    return Session.verify_success(
+                        self.__default_client.deleteData(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("data deletion fails because: ", e1)
                     raise e1
@@ -475,12 +507,14 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return Session.verify_success(self.__default_client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.deleteData(request))
+                    return Session.verify_success(
+                        self.__default_client.deleteData(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("data deletion fails because: ", e1)
                     raise e1
@@ -497,13 +531,15 @@ class Session(object):
             device_id, timestamp, measurements, string_values
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertStringRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecord(request)
+                        self.__default_client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -523,13 +559,15 @@ class Session(object):
             device_id, timestamp, measurements, string_values, True
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertStringRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecord(request)
+                        self.__default_client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -554,12 +592,16 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecord(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecord(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -586,12 +628,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
         try:
-            return Session.verify_success(self.__client.insertRecords(request))
+            return Session.verify_success(self.__default_client.insertRecords(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecords(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -617,12 +661,16 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values, True
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).insertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecord(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecord(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -649,12 +697,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
         try:
-            return Session.verify_success(self.__client.insertRecords(request))
+            return Session.verify_success(self.__default_client.insertRecords(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
+                    return Session.verify_success(
+                        self.__default_client.insertRecords(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -678,12 +728,14 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return Session.verify_success(self.__client.testInsertRecord(request))
+            return Session.verify_success(
+                self.get_client(device_id).testInsertRecord(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.testInsertRecord(request)
+                        self.__default_client.testInsertRecord(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -711,12 +763,14 @@ class Session(object):
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
         try:
-            return Session.verify_success(self.__client.testInsertRecords(request))
+            return Session.verify_success(
+                self.__default_client.testInsertRecords(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     return Session.verify_success(
-                        self.__client.testInsertRecords(request)
+                        self.__default_client.testInsertRecords(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -805,12 +859,16 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).insertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablet(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablet(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -824,12 +882,14 @@ class Session(object):
         """
         request = self.gen_insert_tablets_req(tablet_lst)
         try:
-            return Session.verify_success(self.__client.insertTablets(request))
+            return Session.verify_success(self.__default_client.insertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablets(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -850,12 +910,16 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet, True)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).insertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablet(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablet(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -869,12 +933,14 @@ class Session(object):
         """
         request = self.gen_insert_tablets_req(tablet_lst, True)
         try:
-            return Session.verify_success(self.__client.insertTablets(request))
+            return Session.verify_success(self.__default_client.insertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
+                    return Session.verify_success(
+                        self.__default_client.insertTablets(request)
+                    )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
                     raise e1
@@ -934,14 +1000,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+                self.get_client(device_id).insertRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertRecordsOfOneDevice(request)
+                        self.__default_client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1003,14 +1069,14 @@ class Session(object):
         # send request
         try:
             return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+                self.get_client(device_id).insertRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertRecordsOfOneDevice(request)
+                        self.__default_client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1054,15 +1120,17 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
+        request = self.gen_insert_tablet_req(tablet)
         try:
-            request = self.gen_insert_tablet_req(tablet)
-            return Session.verify_success(self.__client.testInsertTablet(request))
+            return Session.verify_success(
+                self.get_client(tablet.get_device_id()).testInsertTablet(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.testInsertTablet(request)
+                        self.__default_client.testInsertTablet(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -1076,14 +1144,17 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
+        request = self.gen_insert_tablets_req(tablet_list)
         try:
-            request = self.gen_insert_tablets_req(tablet_list)
-            return Session.verify_success(self.__client.testInsertTablets(request))
+            return Session.verify_success(
+                self.__default_client.testInsertTablets(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
+                    request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.testInsertTablets(request)
+                        self.__default_client.testInsertTablets(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("test insert fails because: ", e1)
@@ -1142,13 +1213,13 @@ class Session(object):
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
         try:
-            resp = self.__client.executeQueryStatement(request)
+            resp = self.__default_client.executeQueryStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeQueryStatement(request)
+                    resp = self.__default_client.executeQueryStatement(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1161,7 +1232,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1175,13 +1246,13 @@ class Session(object):
         """
         request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
         try:
-            resp = self.__client.executeUpdateStatement(request)
+            resp = self.__default_client.executeUpdateStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeUpdateStatement(request)
+                    resp = self.__default_client.executeUpdateStatement(request)
                 except TTransport.TException as e1:
                     logger.exception(
                         "execution of non-query statement fails because: ", e1
@@ -1196,13 +1267,13 @@ class Session(object):
             self.__session_id, sql, self.__statement_id, timeout
         )
         try:
-            resp = self.__client.executeStatement(request)
+            resp = self.__default_client.executeStatement(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeStatement(request)
+                    resp = self.__default_client.executeStatement(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of statement fails because: ", e1)
                     raise e1
@@ -1216,7 +1287,7 @@ class Session(object):
                 resp.dataTypeList,
                 resp.columnNameIndexMap,
                 resp.queryId,
-                self.__client,
+                self.__default_client,
                 self.__statement_id,
                 self.__session_id,
                 resp.queryDataSet,
@@ -1276,7 +1347,7 @@ class Session(object):
         if self.__zone_id is not None:
             return self.__zone_id
         try:
-            resp = self.__client.getTimeZone(self.__session_id)
+            resp = self.__default_client.getTimeZone(self.__session_id)
         except TTransport.TException as e:
             raise RuntimeError("Could not get time zone because: ", e)
         return resp.timeZone
@@ -1284,7 +1355,7 @@ class Session(object):
     def set_time_zone(self, zone_id):
         request = TSSetTimeZoneReq(self.__session_id, zone_id)
         try:
-            status = self.__client.setTimeZone(request)
+            status = self.__default_client.setTimeZone(request)
             logger.debug(
                 "setting time zone_id as {}, message: {}".format(
                     zone_id, status.message
@@ -1354,13 +1425,13 @@ class Session(object):
             enableRedirectQuery=False,
         )
         try:
-            resp = self.__client.executeRawDataQuery(request)
+            resp = self.__default_client.executeRawDataQuery(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeRawDataQuery(request)
+                    resp = self.__default_client.executeRawDataQuery(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1373,7 +1444,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1396,13 +1467,13 @@ class Session(object):
             enableRedirectQuery=False,
         )
         try:
-            resp = self.__client.executeLastDataQuery(request)
+            resp = self.__default_client.executeLastDataQuery(request)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     request.statementId = self.__statement_id
-                    resp = self.__client.executeLastDataQuery(request)
+                    resp = self.__default_client.executeLastDataQuery(request)
                 except TTransport.TException as e1:
                     logger.exception("execution of query statement fails because: ", e1)
                     raise e1
@@ -1415,7 +1486,7 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
+            self.__default_client,
             self.__statement_id,
             self.__session_id,
             resp.queryDataSet,
@@ -1449,14 +1520,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+                self.get_client(device_id).insertStringRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecordsOfOneDevice(request)
+                        self.__default_client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1481,14 +1552,14 @@ class Session(object):
         )
         try:
             return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+                self.get_client(device_id).insertStringRecordsOfOneDevice(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.insertStringRecordsOfOneDevice(request)
+                        self.__default_client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("insert fails because: ", e1)
@@ -1501,8 +1572,8 @@ class Session(object):
             return False
         connected = False
         for i in range(1, self.RETRY_NUM + 1):
-            if self.__transport is not None:
-                self.__transport.close()
+            if self.__default_connection.transport is not None:
+                self.__default_connection.transport.close()
                 curr_host_index = random.randint(0, len(self.__hosts))
                 try_host_num = 0
                 for j in range(curr_host_index, len(self.__hosts)):
@@ -1515,7 +1586,10 @@ class Session(object):
                         j = -1
                     try_host_num += 1
                     try:
-                        self.init_connection(self.__default_endpoint)
+                        self.__default_connection = self.init_connection(
+                            self.__default_endpoint
+                        )
+                        self.__default_client = self.__default_connection.client
                         connected = True
                     except TTransport.TException as e:
                         continue
@@ -1524,6 +1598,37 @@ class Session(object):
                 break
         return connected
 
+    def get_client(self, device_id):
+        if (
+            self.__enable_redirection
+            and len(self.__device_id_to_endpoint) != 0
+            and self.__device_id_to_endpoint.has_key(device_id)
+        ):
+            endpoint = self.__device_id_to_endpoint.get(device_id)
+            if self.__endpoint_to_connection.has_key(endpoint):
+                return self.__endpoint_to_connection.get(endpoint).client
+        return self.__default_client
+
+    def handle_redirection(self, device_id, endpoint):
+        if self.__enable_redirection:
+            if endpoint.ip is "0.0.0.0":
+                return
+            if (
+                not self.__device_id_to_endpoint.has_key(device_id)
+                or self.__device_id_to_endpoint.get(device_id) != endpoint
+            ):
+                self.__device_id_to_endpoint[device_id] = endpoint
+            if self.__endpoint_to_connection.has_key(endpoint):
+                connection = self.__endpoint_to_connection[endpoint]
+            else:
+                try:
+                    connection = self.init_connection(endpoint)
+                except Exception:
+                    connection = None
+                self.__endpoint_to_connection[endpoint] = connection
+            if connection is None:
+                self.__device_id_to_endpoint.pop(device_id)
+
     def gen_insert_string_records_of_one_device_request(
         self,
         device_id,
@@ -1562,13 +1667,15 @@ class Session(object):
             self.__session_id, template.get_name(), bytes_array
         )
         try:
-            return Session.verify_success(self.__client.createSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.createSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.createSchemaTemplate(request)
+                        self.__default_client.createSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("create template fails because: ", e1)
@@ -1583,13 +1690,15 @@ class Session(object):
         """
         request = TSDropSchemaTemplateReq(self.__session_id, template_name)
         try:
-            return Session.verify_success(self.__client.dropSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.dropSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.dropSchemaTemplate(request)
+                        self.__default_client.dropSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("drop template fails because: ", e1)
@@ -1625,13 +1734,15 @@ class Session(object):
             list(map(lambda x: x.value, compressors)),
         )
         try:
-            return Session.verify_success(self.__client.appendSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.appendSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.appendSchemaTemplate(request)
+                        self.__default_client.appendSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("append template fails because: ", e1)
@@ -1647,13 +1758,15 @@ class Session(object):
         """
         request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path)
         try:
-            return Session.verify_success(self.__client.pruneSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.pruneSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.pruneSchemaTemplate(request)
+                        self.__default_client.pruneSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("prune template fails because: ", e1)
@@ -1669,13 +1782,15 @@ class Session(object):
         """
         request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path)
         try:
-            return Session.verify_success(self.__client.setSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.setSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.setSchemaTemplate(request)
+                        self.__default_client.setSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("set template fails because: ", e1)
@@ -1694,13 +1809,15 @@ class Session(object):
             self.__session_id, prefix_path, template_name
         )
         try:
-            return Session.verify_success(self.__client.unsetSchemaTemplate(request))
+            return Session.verify_success(
+                self.__default_client.unsetSchemaTemplate(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     return Session.verify_success(
-                        self.__client.unsetSchemaTemplate(request)
+                        self.__default_client.unsetSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
                     logger.exception("unset template fails because: ", e1)
@@ -1719,13 +1836,13 @@ class Session(object):
             TemplateQueryType.COUNT_MEASUREMENTS.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             return response.count
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.count
                 except TTransport.TException as e1:
@@ -1749,14 +1866,14 @@ class Session(object):
             path,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
@@ -1778,14 +1895,14 @@ class Session(object):
             self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
@@ -1809,14 +1926,14 @@ class Session(object):
             pattern,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1837,14 +1954,14 @@ class Session(object):
             TemplateQueryType.SHOW_TEMPLATES.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1862,14 +1979,14 @@ class Session(object):
             self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1889,14 +2006,14 @@ class Session(object):
             TemplateQueryType.SHOW_USING_TEMPLATES.value,
         )
         try:
-            response = self.__client.querySchemaTemplate(request)
+            response = self.__default_client.querySchemaTemplate(request)
             Session.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    response = self.__client.querySchemaTemplate(request)
+                    response = self.__default_client.querySchemaTemplate(request)
                     Session.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
@@ -1904,3 +2021,25 @@ class Session(object):
                     raise e1
             else:
                 raise e
+
+
+class SessionConnection(object):
+    def __init__(
+        self,
+        client,
+        transport,
+    ):
+        self.client = client
+        self.transport = transport
+
+    def close_connection(self, req):
+        try:
+            self.client.closeSession(req)
+        except TTransport.TException as e:
+            logger.exception(
+                "Error occurs when closing session at server. Maybe server is down. Error message: ",
+                exc_info=e,
+            )
+        finally:
+            if self.transport is not None:
+                self.transport.close()