You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/04/10 07:46:09 UTC

[iotdb] branch master updated: [IOTDB-5752] Python Client supports write redirection (#9467)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c951bb6702 [IOTDB-5752] Python Client supports write redirection (#9467)
c951bb6702 is described below

commit c951bb670240e6eaff2fdafafeb061ae91921f4e
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Apr 10 15:46:02 2023 +0800

    [IOTDB-5752] Python Client supports write redirection (#9467)
---
 client-py/SessionExample.py                        |   3 +-
 client-py/iotdb/Session.py                         | 580 ++++++++++++++++-----
 .../UserGuide/API/Programming-Python-Native-API.md |   6 +-
 .../UserGuide/API/Programming-Python-Native-API.md |   6 +-
 4 files changed, 458 insertions(+), 137 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index c0ede83b2a..631b3e5d39 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -33,13 +33,14 @@ ip = "127.0.0.1"
 port_ = "6667"
 username_ = "root"
 password_ = "root"
-# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True)
 session = Session.init_from_node_urls(
     node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
     user="root",
     password="root",
     fetch_size=1024,
     zone_id="UTC+8",
+    enable_redirection=True,
 )
 session.open(False)
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..a1be74eae4 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -25,7 +25,7 @@ from thrift.transport import TSocket, TTransport
 from iotdb.utils.SessionDataSet import SessionDataSet
 from .template.Template import Template
 from .template.TemplateQueryType import TemplateQueryType
-from .thrift.common.ttypes import TEndPoint
+from .thrift.common.ttypes import TEndPoint, TSStatus
 from .thrift.rpc.IClientRPCService import (
     Client,
     TSCreateTimeseriesReq,
@@ -57,17 +57,6 @@ from .thrift.rpc.ttypes import (
     TSInsertStringRecordsOfOneDeviceReq,
 )
 
-# for debug
-# from IoTDBConstants import *
-# from SessionDataSet import SessionDataSet
-#
-# from thrift.protocol import TBinaryProtocol, TCompactProtocol
-# from thrift.transport import TSocket, TTransport
-#
-# from iotdb.rpc.IClientRPCService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
-#      TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \
-#      TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
-# from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
 from .utils.IoTDBConstants import TSDataType
 
 logger = logging.getLogger("IoTDB")
@@ -91,6 +80,7 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         self.__host = host
         self.__port = port
@@ -101,13 +91,16 @@ class Session(object):
         self.__password = password
         self.__fetch_size = fetch_size
         self.__is_close = True
-        self.__transport = None
         self.__client = None
+        self.__default_connection = None
         self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
         self.__enable_rpc_compression = None
+        self.__enable_redirection = enable_redirection
+        self.__device_id_to_endpoint = None
+        self.__endpoint_to_connection = None
 
     @classmethod
     def init_from_node_urls(
@@ -117,16 +110,19 @@ class Session(object):
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
+        enable_redirection=True,
     ):
         if node_urls is None:
             raise RuntimeError("node urls is empty")
-        session = Session(None, None, user, password, fetch_size, zone_id)
+        session = Session(
+            None, None, user, password, fetch_size, zone_id, enable_redirection
+        )
         session.__hosts = []
         session.__ports = []
         for node_url in node_urls:
             split = node_url.split(":")
             session.__hosts.append(split[0])
-            session.__ports.append(split[1])
+            session.__ports.append(int(split[1]))
         session.__host = session.__hosts[0]
         session.__port = session.__ports[0]
         session.__default_endpoint = TEndPoint(session.__host, session.__port)
@@ -137,33 +133,44 @@ class Session(object):
             return
         self.__enable_rpc_compression = enable_rpc_compression
         if self.__hosts is None:
-            self.init_connection(self.__default_endpoint)
+            self.__default_connection = self.init_connection(self.__default_endpoint)
         else:
             for i in range(0, len(self.__hosts)):
                 self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
                 try:
-                    self.init_connection(self.__default_endpoint)
+                    self.__default_connection = self.init_connection(
+                        self.__default_endpoint
+                    )
                 except Exception as e:
                     if not self.reconnect():
                         logger.error("Cluster has no nodes to connect")
                         raise e
                 break
+        self.__client = self.__default_connection.client
+        self.__session_id = self.__default_connection.session_id
+        self.__statement_id = self.__default_connection.statement_id
+        self.__is_close = False
+        if self.__enable_redirection:
+            self.__device_id_to_endpoint = {}
+            self.__endpoint_to_connection = {
+                str(self.__default_endpoint): self.__default_connection
+            }
 
     def init_connection(self, endpoint):
-        self.__transport = TTransport.TFramedTransport(
+        transport = TTransport.TFramedTransport(
             TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
-        if not self.__transport.isOpen():
+        if not transport.isOpen():
             try:
-                self.__transport.open()
+                transport.open()
             except TTransport.TTransportException as e:
                 raise e
 
         if self.__enable_rpc_compression:
-            self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+            client = Client(TCompactProtocol.TCompactProtocol(transport))
         else:
-            self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+            client = Client(TBinaryProtocol.TBinaryProtocol(transport))
 
         open_req = TSOpenSessionReq(
             client_protocol=self.protocol_version,
@@ -174,7 +181,8 @@ class Session(object):
         )
 
         try:
-            open_resp = self.__client.openSession(open_req)
+            open_resp = client.openSession(open_req)
+            Session.verify_success(open_resp.status)
 
             if self.protocol_version != open_resp.serverProtocolVersion:
                 logger.exception(
@@ -186,20 +194,23 @@ class Session(object):
                 if open_resp.serverProtocolVersion == 0:
                     raise TTransport.TException(message="Protocol not supported.")
 
-            self.__session_id = open_resp.sessionId
-            self.__statement_id = self.__client.requestStatementId(self.__session_id)
+            session_id = open_resp.sessionId
+            statement_id = client.requestStatementId(session_id)
 
         except Exception as e:
-            self.__transport.close()
+            transport.close()
             logger.exception("session closed because: ", exc_info=e)
             raise e
 
         if self.__zone_id is not None:
-            self.set_time_zone(self.__zone_id)
+            request = TSSetTimeZoneReq(session_id, self.__zone_id)
+            try:
+                client.setTimeZone(request)
+            except TTransport.TException as e:
+                raise RuntimeError("Could not set time zone because: ", e)
         else:
             self.__zone_id = self.get_time_zone()
-
-        self.__is_close = False
+        return SessionConnection(client, transport, session_id, statement_id)
 
     def is_open(self):
         return not self.__is_close
@@ -207,18 +218,16 @@ class Session(object):
     def close(self):
         if self.__is_close:
             return
-        req = TSCloseSessionReq(self.__session_id)
         try:
-            self.__client.closeSession(req)
-        except TTransport.TException as e:
-            logger.exception(
-                "Error occurs when closing session at server. Maybe server is down. Error message: ",
-                exc_info=e,
-            )
+            if self.__enable_redirection:
+                for connection in self.__endpoint_to_connection.values():
+                    req = TSCloseSessionReq(connection.session_id)
+                    connection.close_connection(req)
+            else:
+                req = TSCloseSessionReq(self.__session_id)
+                self.__default_connection.close_connection(req)
         finally:
             self.__is_close = True
-            if self.__transport is not None:
-                self.__transport.close()
 
     def set_storage_group(self, group_name):
         """
@@ -497,7 +506,13 @@ class Session(object):
             device_id, timestamp, measurements, string_values
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertStringRecord(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -523,7 +538,13 @@ class Session(object):
             device_id, timestamp, measurements, string_values, True
         )
         try:
-            return Session.verify_success(self.__client.insertStringRecord(request))
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertStringRecord(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -554,7 +575,13 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertRecord(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -567,7 +594,7 @@ class Session(object):
                 raise e
 
     def insert_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+        self, device_ids: list, times, measurements_lst, types_lst, values_lst
     ):
         """
         insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -582,21 +609,57 @@ class Session(object):
         for types in types_lst:
             data_types = [data_type.value for data_type in types]
             type_values_lst.append(data_types)
-        request = self.gen_insert_records_req(
-            device_ids, times, measurements_lst, type_values_lst, values_lst
-        )
-        try:
-            return Session.verify_success(self.__client.insertRecords(request))
-        except TTransport.TException as e:
-            if self.reconnect():
+        if self.__enable_redirection:
+            request_group = {}
+            for i in range(len(device_ids)):
+                connection = self.get_connection(device_ids[i])
+                request = request_group.setdefault(
+                    connection.client,
+                    TSInsertRecordsReq(connection.session_id, [], [], [], []),
+                )
+                request.prefixPaths.append(device_ids[i])
+                request.timestamps.append(times[i])
+                request.measurementsList.append(measurements_lst[i])
+                request.valuesList.append(
+                    Session.value_to_bytes(type_values_lst[i], values_lst[i])
+                )
+            for client, request in request_group.items():
                 try:
-                    request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
-                except TTransport.TException as e1:
-                    logger.exception("insert fails because: ", e1)
-                    raise e1
-            else:
-                raise e
+                    Session.verify_success_with_redirection_for_multi_devices(
+                        client.insertRecords(request), request.prefixPaths
+                    )
+                except RedirectException as e:
+                    for device, endpoint in e.device_to_endpoint.items():
+                        self.handle_redirection(device, endpoint)
+                except TTransport.TException as e:
+                    if self.reconnect():
+                        try:
+                            request.sessionId = self.__session_id
+                            Session.verify_success(self.__client.insertRecords(request))
+                        except TTransport.TException as e1:
+                            logger.exception("insert fails because: ", e1)
+                            raise e1
+                    else:
+                        raise e
+            return 0
+        else:
+            request = self.gen_insert_records_req(
+                device_ids, times, measurements_lst, type_values_lst, values_lst
+            )
+            try:
+                return Session.verify_success(self.__client.insertRecords(request))
+            except TTransport.TException as e:
+                if self.reconnect():
+                    try:
+                        request.sessionId = self.__session_id
+                        return Session.verify_success(
+                            self.__client.insertRecords(request)
+                        )
+                    except TTransport.TException as e1:
+                        logger.exception("insert fails because: ", e1)
+                        raise e1
+                else:
+                    raise e
 
     def insert_aligned_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -617,7 +680,13 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values, True
         )
         try:
-            return Session.verify_success(self.__client.insertRecord(request))
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertRecord(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -645,21 +714,57 @@ class Session(object):
         for types in types_lst:
             data_types = [data_type.value for data_type in types]
             type_values_lst.append(data_types)
-        request = self.gen_insert_records_req(
-            device_ids, times, measurements_lst, type_values_lst, values_lst, True
-        )
-        try:
-            return Session.verify_success(self.__client.insertRecords(request))
-        except TTransport.TException as e:
-            if self.reconnect():
+        if self.__enable_redirection:
+            request_group = {}
+            for i in range(len(device_ids)):
+                connection = self.get_connection(device_ids[i])
+                request = request_group.setdefault(
+                    connection.client,
+                    TSInsertRecordsReq(connection.session_id, [], [], [], [], True),
+                )
+                request.prefixPaths.append(device_ids[i])
+                request.timestamps.append(times[i])
+                request.measurementsList.append(measurements_lst[i])
+                request.valuesList.append(
+                    Session.value_to_bytes(type_values_lst[i], values_lst[i])
+                )
+            for client, request in request_group.items():
                 try:
-                    request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertRecords(request))
-                except TTransport.TException as e1:
-                    logger.exception("insert fails because: ", e1)
-                    raise e1
-            else:
-                raise e
+                    Session.verify_success_with_redirection_for_multi_devices(
+                        client.insertRecords(request), request.prefixPaths
+                    )
+                except RedirectException as e:
+                    for device, endpoint in e.device_to_endpoint.items():
+                        self.handle_redirection(device, endpoint)
+                except TTransport.TException as e:
+                    if self.reconnect():
+                        try:
+                            request.sessionId = self.__session_id
+                            Session.verify_success(self.__client.insertRecords(request))
+                        except TTransport.TException as e1:
+                            logger.exception("insert fails because: ", e1)
+                            raise e1
+                    else:
+                        raise e
+            return 0
+        else:
+            request = self.gen_insert_records_req(
+                device_ids, times, measurements_lst, type_values_lst, values_lst, True
+            )
+            try:
+                return Session.verify_success(self.__client.insertRecords(request))
+            except TTransport.TException as e:
+                if self.reconnect():
+                    try:
+                        request.sessionId = self.__session_id
+                        return Session.verify_success(
+                            self.__client.insertRecords(request)
+                        )
+                    except TTransport.TException as e1:
+                        logger.exception("insert fails because: ", e1)
+                        raise e1
+                else:
+                    raise e
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -805,7 +910,13 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            connection = self.get_connection(tablet.get_device_id())
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertTablet(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(tablet.get_device_id(), e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -822,19 +933,60 @@ class Session(object):
         insert multiple tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        request = self.gen_insert_tablets_req(tablet_lst)
-        try:
-            return Session.verify_success(self.__client.insertTablets(request))
-        except TTransport.TException as e:
-            if self.reconnect():
+        if self.__enable_redirection:
+            request_group = {}
+            for i in range(len(tablet_lst)):
+                connection = self.get_connection(tablet_lst[i].get_device_id())
+                request = request_group.setdefault(
+                    connection.client,
+                    TSInsertTabletsReq(
+                        connection.session_id, [], [], [], [], [], [], False
+                    ),
+                )
+                request.prefixPaths.append(tablet_lst[i].get_device_id())
+                request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
+                request.measurementsList.append(tablet_lst[i].get_measurements())
+                request.valuesList.append(tablet_lst[i].get_binary_values())
+                request.sizeList.append(tablet_lst[i].get_row_number())
+                data_type_values = [
+                    data_type.value for data_type in tablet_lst[i].get_data_types()
+                ]
+                request.typesList.append(data_type_values)
+            for client, request in request_group.items():
                 try:
-                    request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
-                except TTransport.TException as e1:
-                    logger.exception("insert fails because: ", e1)
-                    raise e1
-            else:
-                raise e
+                    Session.verify_success_with_redirection_for_multi_devices(
+                        client.insertTablets(request), request.prefixPaths
+                    )
+                except RedirectException as e:
+                    for device, endpoint in e.device_to_endpoint.items():
+                        self.handle_redirection(device, endpoint)
+                except TTransport.TException as e:
+                    if self.reconnect():
+                        try:
+                            request.sessionId = self.__session_id
+                            Session.verify_success(self.__client.insertTablets(request))
+                        except TTransport.TException as e1:
+                            logger.exception("insert fails because: ", e1)
+                            raise e1
+                    else:
+                        raise e
+            return 0
+        else:
+            request = self.gen_insert_tablets_req(tablet_lst)
+            try:
+                return Session.verify_success(self.__client.insertTablets(request))
+            except TTransport.TException as e:
+                if self.reconnect():
+                    try:
+                        request.sessionId = self.__session_id
+                        return Session.verify_success(
+                            self.__client.insertTablets(request)
+                        )
+                    except TTransport.TException as e1:
+                        logger.exception("insert fails because: ", e1)
+                        raise e1
+                else:
+                    raise e
 
     def insert_aligned_tablet(self, tablet):
         """
@@ -850,7 +1002,13 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet, True)
         try:
-            return Session.verify_success(self.__client.insertTablet(request))
+            connection = self.get_connection(tablet.get_device_id())
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertTablet(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(tablet.get_device_id(), e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -867,19 +1025,60 @@ class Session(object):
         insert multiple aligned tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        request = self.gen_insert_tablets_req(tablet_lst, True)
-        try:
-            return Session.verify_success(self.__client.insertTablets(request))
-        except TTransport.TException as e:
-            if self.reconnect():
+        if self.__enable_redirection:
+            request_group = {}
+            for i in range(len(tablet_lst)):
+                connection = self.get_connection(tablet_lst[i].get_device_id())
+                request = request_group.setdefault(
+                    connection.client,
+                    TSInsertTabletsReq(
+                        connection.session_id, [], [], [], [], [], [], True
+                    ),
+                )
+                request.prefixPaths.append(tablet_lst[i].get_device_id())
+                request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
+                request.measurementsList.append(tablet_lst[i].get_measurements())
+                request.valuesList.append(tablet_lst[i].get_binary_values())
+                request.sizeList.append(tablet_lst[i].get_row_number())
+                data_type_values = [
+                    data_type.value for data_type in tablet_lst[i].get_data_types()
+                ]
+                request.typesList.append(data_type_values)
+            for client, request in request_group.items():
                 try:
-                    request.sessionId = self.__session_id
-                    return Session.verify_success(self.__client.insertTablets(request))
-                except TTransport.TException as e1:
-                    logger.exception("insert fails because: ", e1)
-                    raise e1
-            else:
-                raise e
+                    Session.verify_success_with_redirection_for_multi_devices(
+                        client.insertTablets(request), request.prefixPaths
+                    )
+                except RedirectException as e:
+                    for device, endpoint in e.device_to_endpoint.items():
+                        self.handle_redirection(device, endpoint)
+                except TTransport.TException as e:
+                    if self.reconnect():
+                        try:
+                            request.sessionId = self.__session_id
+                            Session.verify_success(self.__client.insertTablets(request))
+                        except TTransport.TException as e1:
+                            logger.exception("insert fails because: ", e1)
+                            raise e1
+                    else:
+                        raise e
+            return 0
+        else:
+            request = self.gen_insert_tablets_req(tablet_lst, True)
+            try:
+                return Session.verify_success(self.__client.insertTablets(request))
+            except TTransport.TException as e:
+                if self.reconnect():
+                    try:
+                        request.sessionId = self.__session_id
+                        return Session.verify_success(
+                            self.__client.insertTablets(request)
+                        )
+                    except TTransport.TException as e1:
+                        logger.exception("insert fails because: ", e1)
+                        raise e1
+                else:
+                    raise e
 
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -910,7 +1109,6 @@ class Session(object):
         :param measurements_list: measurements list
         :param types_list: types list
         :param values_list: values list
-        :param have_sorted: have these list been sorted by timestamp
         """
         # check parameter
         size = len(times_list)
@@ -933,9 +1131,13 @@ class Session(object):
             device_id, times_list, measurements_list, values_list, types_list
         )
         try:
-            return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertRecordsOfOneDevice(request)
             )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -1002,9 +1204,13 @@ class Session(object):
 
         # send request
         try:
-            return Session.verify_success(
-                self.__client.insertRecordsOfOneDevice(request)
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertRecordsOfOneDevice(request)
             )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -1054,8 +1260,8 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
+        request = self.gen_insert_tablet_req(tablet)
         try:
-            request = self.gen_insert_tablet_req(tablet)
             return Session.verify_success(self.__client.testInsertTablet(request))
         except TTransport.TException as e:
             if self.reconnect():
@@ -1076,12 +1282,13 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
+        request = self.gen_insert_tablets_req(tablet_list)
         try:
-            request = self.gen_insert_tablets_req(tablet_list)
             return Session.verify_success(self.__client.testInsertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
+                    request.sessionId = self.__session_id
                     return Session.verify_success(
                         self.__client.testInsertTablets(request)
                     )
@@ -1136,7 +1343,8 @@ class Session(object):
         """
         execute query sql statement and returns SessionDataSet
         :param sql: String, query sql statement
-        :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
+        :param timeout:
+        :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py):
         """
         request = TSExecuteStatementReq(
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
@@ -1302,7 +1510,7 @@ class Session(object):
         return True
 
     @staticmethod
-    def verify_success(status):
+    def verify_success(status: TSStatus):
         """
         verify success of operation
         :param status: execution result status
@@ -1320,7 +1528,7 @@ class Session(object):
         raise RuntimeError(str(status.code) + ": " + status.message)
 
     @staticmethod
-    def verify_success_by_list(status_list):
+    def verify_success_by_list(status_list: list):
         """
         verify success of operation
         :param status_list: execution result status
@@ -1334,6 +1542,28 @@ class Session(object):
                 message += status.message + "; "
         raise RuntimeError(message)
 
+    @staticmethod
+    def verify_success_with_redirection(status: TSStatus):
+        Session.verify_success(status)
+        if status.redirectNode is not None:
+            raise RedirectException(status.redirectNode)
+        return 0
+
+    @staticmethod
+    def verify_success_with_redirection_for_multi_devices(
+        status: TSStatus, devices: list
+    ):
+        Session.verify_success(status)
+        if (
+            status.code == Session.MULTIPLE_ERROR
+            or status.code == Session.REDIRECTION_RECOMMEND
+        ):
+            device_to_endpoint = {}
+            for i in range(len(status.subStatus)):
+                if status.subStatus[i].redirectNode is not None:
+                    device_to_endpoint[devices[i]] = status.subStatus[i].redirectNode
+            raise RedirectException(device_to_endpoint)
+
     def execute_raw_data_query(
         self, paths: list, start_time: int, end_time: int
     ) -> SessionDataSet:
@@ -1448,9 +1678,13 @@ class Session(object):
             device_id, times, measurements_list, values_list, have_sorted, False
         )
         try:
-            return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertStringRecordsOfOneDevice(request)
             )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -1480,9 +1714,13 @@ class Session(object):
             device_id, times, measurements_list, values, have_sorted, True
         )
         try:
-            return Session.verify_success(
-                self.__client.insertStringRecordsOfOneDevice(request)
+            connection = self.get_connection(device_id)
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertStringRecordsOfOneDevice(request)
             )
+        except RedirectException as e:
+            return self.handle_redirection(device_id, e.redirect_node)
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -1501,29 +1739,71 @@ class Session(object):
             return False
         connected = False
         for i in range(1, self.RETRY_NUM + 1):
-            if self.__transport is not None:
-                self.__transport.close()
-                curr_host_index = random.randint(0, len(self.__hosts))
-                try_host_num = 0
-                for j in range(curr_host_index, len(self.__hosts)):
-                    if try_host_num == len(self.__hosts):
-                        break
-                    self.__default_endpoint = TEndPoint(
-                        self.__hosts[j], self.__ports[j]
-                    )
-                    if j == len(self.__hosts) - 1:
-                        j = -1
-                    try_host_num += 1
-                    try:
-                        self.init_connection(self.__default_endpoint)
-                        connected = True
-                    except TTransport.TException as e:
-                        continue
+            if (
+                self.__default_connection is not None
+                and self.__default_connection.transport is not None
+            ):
+                self.__default_connection.transport.close()
+            curr_host_index = random.randint(0, len(self.__hosts))
+            try_host_num = 0
+            for j in range(curr_host_index, len(self.__hosts)):
+                if try_host_num == len(self.__hosts):
                     break
+                self.__default_endpoint = TEndPoint(self.__hosts[j], self.__ports[j])
+                if j == len(self.__hosts) - 1:
+                    j = -1
+                try_host_num += 1
+                try:
+                    self.__default_connection = self.init_connection(
+                        self.__default_endpoint
+                    )
+                    self.__client = self.__default_connection.client
+                    self.__session_id = self.__default_connection.session_id
+                    self.__statement_id = self.__default_connection.statement_id
+                    connected = True
+                    if self.__enable_redirection:
+                        self.__endpoint_to_connection = {
+                            str(self.__default_endpoint): self.__default_connection
+                        }
+                except TTransport.TException:
+                    continue
+                break
             if connected:
                 break
         return connected
 
+    def get_connection(self, device_id):
+        if (
+            self.__enable_redirection
+            and len(self.__device_id_to_endpoint) != 0
+            and device_id in self.__device_id_to_endpoint
+        ):
+            endpoint = self.__device_id_to_endpoint[device_id]
+            if str(endpoint) in self.__endpoint_to_connection:
+                return self.__endpoint_to_connection[str(endpoint)]
+        return self.__default_connection
+
+    def handle_redirection(self, device_id, endpoint: TEndPoint):
+        if self.__enable_redirection:
+            if endpoint.ip == "0.0.0.0":
+                return 0
+            if (
+                device_id not in self.__device_id_to_endpoint
+                or self.__device_id_to_endpoint[device_id] != endpoint
+            ):
+                self.__device_id_to_endpoint[device_id] = endpoint
+            if str(endpoint) in self.__endpoint_to_connection:
+                connection = self.__endpoint_to_connection[str(endpoint)]
+            else:
+                try:
+                    connection = self.init_connection(endpoint)
+                except Exception:
+                    connection = None
+                self.__endpoint_to_connection[str(endpoint)] = connection
+            if connection is None:
+                self.__device_id_to_endpoint.pop(device_id)
+        return 0
+
     def gen_insert_string_records_of_one_device_request(
         self,
         device_id,
@@ -1607,9 +1887,10 @@ class Session(object):
         is_aligned: bool = False,
     ):
         """
-        add measurements in the template, the template must already create. This function adds some measurements node.
+        add measurements in the template, the template must already create. This function adds some measurements' node.
         :param template_name: template name, string list, like ["name_x", "name_y", "name_z"]
-        :param measurements_path: when ths is_aligned is True, recommend the name like a.b, like [python.x, python.y, iotdb.z]
+        :param measurements_path: when ths is_aligned is True, recommend the name like a.b,
+        like [python.x, python.y, iotdb.z]
         :param data_types: using TSDataType(see IoTDBConstants.py)
         :param encodings: using TSEncoding(see IoTDBConstants.py)
         :param compressors: using Compressor(see IoTDBConstants.py)
@@ -1800,7 +2081,7 @@ class Session(object):
         """
         show all measurements under the pattern in template
         :param template_name: template name
-        :param pattern: parent path, if default, show all measurements
+        :param pattern: parent path, if defaulted, show all measurements
         """
         request = TSQueryTemplateReq(
             self.__session_id,
@@ -1904,3 +2185,38 @@ class Session(object):
                     raise e1
             else:
                 raise e
+
+
+class SessionConnection(object):
+    def __init__(
+        self,
+        client,
+        transport,
+        session_id,
+        statement_id,
+    ):
+        self.client = client
+        self.transport = transport
+        self.session_id = session_id
+        self.statement_id = statement_id
+
+    def close_connection(self, req):
+        try:
+            self.client.closeSession(req)
+        except TTransport.TException as e:
+            logger.exception(
+                "Error occurs when closing session at server. Maybe server is down. Error message: ",
+                exc_info=e,
+            )
+        finally:
+            if self.transport is not None:
+                self.transport.close()
+
+
+class RedirectException(Exception):
+    def __init__(self, redirect_info):
+        Exception.__init__(self)
+        if isinstance(redirect_info, TEndPoint):
+            self.redirect_node = redirect_info
+        else:
+            self.device_to_endpoint = redirect_info
diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md
index 1233a071dd..44fdd90cc3 100644
--- a/docs/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/UserGuide/API/Programming-Python-Native-API.md
@@ -63,7 +63,8 @@ session = Session(
     user="root",
     password="root",
     fetch_size=1024,
-    zone_id="UTC+8"
+    zone_id="UTC+8",
+    enable_redirection=True
 )
 ```
 
@@ -75,7 +76,8 @@ session = Session.init_from_node_urls(
     user="root",
     password="root",
     fetch_size=1024,
-    zone_id="UTC+8"
+    zone_id="UTC+8",
+    enable_redirection=True
 )
 ```
 
diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
index f5025f5d89..8e09dbdac8 100644
--- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
@@ -64,7 +64,8 @@ session = Session(
     user="root",
     password="root",
     fetch_size=1024,
-    zone_id="UTC+8"
+    zone_id="UTC+8",
+    enable_redirection=True
 )
 ```
 
@@ -76,7 +77,8 @@ session = Session.init_from_node_urls(
     user="root",
     password="root",
     fetch_size=1024,
-    zone_id="UTC+8"
+    zone_id="UTC+8",
+    enable_redirection=True
 )
 ```