You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2023/03/22 15:32:21 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 2280534d82 [To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)
2280534d82 is described below

commit 2280534d8289867b37df40617ec5502a4ec1f45d
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Mar 22 23:32:08 2023 +0800

    [To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)
---
 client-py/SessionExample.py                        |  10 +-
 client-py/iotdb/Session.py                         | 651 +++++++++++++++------
 client-py/tests/test_dataframe.py                  |   5 +-
 client-py/tests/test_delete_data.py                |   9 +-
 .../UserGuide/API/Programming-Python-Native-API.md |  21 +-
 .../UserGuide/API/Programming-Python-Native-API.md |  21 +-
 6 files changed, 523 insertions(+), 194 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index bbc9669527..6dda1c49e0 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -29,7 +29,14 @@ ip = "127.0.0.1"
 port_ = "6667"
 username_ = "root"
 password_ = "root"
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8",
+)
 session.open(False)
 
 # set and delete storage groups
@@ -215,6 +222,7 @@ np_tablet_unsorted = NumpyTablet(
     np_values_unsorted,
     np_timestamps_unsorted,
 )
+
 session.insert_tablet(np_tablet_unsorted)
 print(np_tablet_unsorted.get_timestamps())
 for value in np_tablet_unsorted.get_values():
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 145fd94370..cd5a12f67f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,14 +16,21 @@
 # under the License.
 #
 import logging
+import random
 import struct
 import time
 
-from iotdb.utils.SessionDataSet import SessionDataSet
 
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
+from iotdb.utils.SessionDataSet import SessionDataSet
+from .thrift.rpc.ttypes import (
+    EndPoint,
+    TSRawDataQueryReq,
+    TSLastDataQueryReq,
+    TSInsertStringRecordsOfOneDeviceReq,
+)
 from .thrift.rpc.TSIService import (
     Client,
     TSCreateTimeseriesReq,
@@ -34,7 +41,6 @@ from .thrift.rpc.TSIService import (
     TSExecuteStatementReq,
     TSOpenSessionReq,
     TSCreateMultiTimeseriesReq,
-    TSCreateSchemaTemplateReq,
     TSCloseSessionReq,
     TSInsertTabletsReq,
     TSInsertRecordsReq,
@@ -65,6 +71,7 @@ class Session(object):
     DEFAULT_USER = "root"
     DEFAULT_PASSWORD = "root"
     DEFAULT_ZONE_ID = time.strftime("%z")
+    RETRY_NUM = 3
 
     def __init__(
         self,
@@ -77,6 +84,9 @@ class Session(object):
     ):
         self.__host = host
         self.__port = port
+        self.__hosts = None
+        self.__ports = None
+        self.__default_endpoint = EndPoint(self.__host, self.__port)
         self.__user = user
         self.__password = password
         self.__fetch_size = fetch_size
@@ -87,21 +97,60 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__enable_rpc_compression = None
+
+    @classmethod
+    def init_from_node_urls(
+        cls,
+        node_urls,
+        user=DEFAULT_USER,
+        password=DEFAULT_PASSWORD,
+        fetch_size=DEFAULT_FETCH_SIZE,
+        zone_id=DEFAULT_ZONE_ID,
+    ):
+        if node_urls is None:
+            raise RuntimeError("node urls is empty")
+        session = Session(None, None, user, password, fetch_size, zone_id)
+        session.__hosts = []
+        session.__ports = []
+        for node_url in node_urls:
+            split = node_url.split(":")
+            session.__hosts.append(split[0])
+            session.__ports.append(split[1])
+        session.__host = session.__hosts[0]
+        session.__port = session.__ports[0]
+        session.__default_endpoint = EndPoint(session.__host, session.__port)
+        return session
 
     def open(self, enable_rpc_compression):
         if not self.__is_close:
             return
+        self.__enable_rpc_compression = enable_rpc_compression
+        if self.__hosts is None:
+            self.init_connection(self.__default_endpoint)
+        else:
+            for i in range(0, len(self.__hosts)):
+                self.__default_endpoint = EndPoint(self.__hosts[i], self.__ports[i])
+                try:
+                    self.init_connection(self.__default_endpoint)
+                except Exception as e:
+                    if not self.reconnect():
+                        logger.error("Cluster has no nodes to connect")
+                        raise e
+                break
+
+    def init_connection(self, endpoint):
         self.__transport = TTransport.TFramedTransport(
-            TSocket.TSocket(self.__host, self.__port)
+            TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
         if not self.__transport.isOpen():
             try:
                 self.__transport.open()
             except TTransport.TTransportException as e:
-                logger.exception("TTransportException!", exc_info=e)
+                raise e
 
-        if enable_rpc_compression:
+        if self.__enable_rpc_compression:
             self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
         else:
             self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -133,6 +182,7 @@ class Session(object):
         except Exception as e:
             self.__transport.close()
             logger.exception("session closed because: ", exc_info=e)
+            raise e
 
         if self.__zone_id is not None:
             self.set_time_zone(self.__zone_id)
@@ -165,12 +215,21 @@ class Session(object):
         set one storage group
         :param group_name: String, storage group name (starts from root)
         """
-        status = self.__client.setStorageGroup(self.__session_id, group_name)
-        logger.debug(
-            "setting storage group {} message: {}".format(group_name, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.setStorageGroup(self.__session_id, group_name)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.setStorageGroup(self.__session_id, group_name)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("create databases fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_storage_group(self, storage_group):
         """
@@ -185,14 +244,23 @@ class Session(object):
         delete multiple storage groups.
         :param storage_group_lst: List, paths of the target storage groups.
         """
-        status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
-        logger.debug(
-            "delete storage group(s) {} message: {}".format(
-                storage_group_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteStorageGroups(
+                            self.__session_id, storage_group_lst
+                        )
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("delete database fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_time_series(
         self,
@@ -230,12 +298,20 @@ class Session(object):
             attributes,
             alias,
         )
-        status = self.__client.createTimeseries(request)
-        logger.debug(
-            "creating time series {} message: {}".format(ts_path, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_aligned_time_series(
         self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -260,14 +336,21 @@ class Session(object):
             encoding_lst,
             compressor_lst,
         )
-        status = self.__client.createAlignedTimeseries(request)
-        logger.debug(
-            "creating aligned time series of device {} message: {}".format(
-                measurements_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.createAlignedTimeseries(request)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createAlignedTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            raise e
 
     def create_multi_time_series(
         self,
@@ -306,28 +389,39 @@ class Session(object):
             attributes_lst,
             alias_lst,
         )
-        status = self.__client.createMultiTimeseries(request)
-        logger.debug(
-            "creating multiple time series {} message: {}".format(
-                ts_path_lst, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createMultiTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createMultiTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating multi time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_time_series(self, paths_list):
         """
         delete multiple time series, including data and schema
         :param paths_list: List of time series path, which should be complete (starts from root)
         """
-        status = self.__client.deleteTimeseries(self.__session_id, paths_list)
-        logger.debug(
-            "deleting multiple time series {} message: {}".format(
-                paths_list, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteTimeseries(self.__session_id, paths_list)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("deleting time series fails because: ", e1)
+                    raise e1
 
     def check_time_series_exists(self, path):
         """
@@ -346,14 +440,21 @@ class Session(object):
         :param paths_list: time series list that the data in.
         :param end_time: data with time stamp less than or equal to time will be deleted.
         """
-        request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+        request = TSDeleteDataReq(
+            self.__session_id, paths_list, -9223372036854775808, end_time
+        )
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_data_in_range(self, paths_list, start_time, end_time):
         """
@@ -364,12 +465,17 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_str_record(self, device_id, timestamp, measurements, string_values):
         """special case for inserting one row of String (TEXT) value"""
@@ -377,18 +483,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values
-        )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
+            device_id, timestamp, measurements, string_values
         )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_str_record(
         self, device_id, timestamp, measurements, string_values
@@ -398,18 +509,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values, True
+            device_id, timestamp, measurements, string_values, True
         )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
@@ -427,14 +543,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -455,14 +575,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -482,14 +606,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values, True
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -510,14 +638,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -535,14 +667,19 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.testInsertRecord(request)
-        logger.debug(
-            "testing! insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -563,12 +700,19 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.testInsertRecords(request)
-        logger.debug(
-            "testing! insert multiple records, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecords(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_record_req(
         self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -588,11 +732,11 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+        self, device_id, timestamp, measurements, values, is_aligned=False
     ):
-        if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+        if len(values) != len(measurements):
             raise RuntimeError(
-                "length of data types does not equal to length of values!"
+                "length of measurements does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
             self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -649,24 +793,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_tablets(self, tablet_lst):
         """
         insert multiple tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablet(self, tablet):
         """
@@ -680,26 +838,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet, True)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablets(self, tablet_lst):
         """
         insert multiple aligned tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(
-            self.gen_insert_tablets_req(tablet_lst, True)
-        )
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst, True)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -752,12 +922,22 @@ class Session(object):
         request = self.gen_insert_records_of_one_device_request(
             device_id, times_list, measurements_list, values_list, types_list
         )
-
-        # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -811,10 +991,22 @@ class Session(object):
         )
 
         # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_records_of_one_device_request(
         self,
@@ -852,14 +1044,21 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
-        status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "testing! insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablet_req(tablet)
+            return Session.verify_success(self.__client.testInsertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.testInsertTablet(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_tablets(self, tablet_list):
         """
@@ -867,14 +1066,20 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
-        status = self.__client.testInsertTablets(
-            self.gen_insert_tablets_req(tablet_list)
-        )
-        logger.debug(
-            "testing! insert multiple tablets, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablets_req(tablet_list)
+            return Session.verify_success(self.__client.testInsertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertTablets(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -926,7 +1131,20 @@ class Session(object):
         request = TSExecuteStatementReq(
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
-        resp = self.__client.executeQueryStatement(request)
+        try:
+            resp = self.__client.executeQueryStatement(request)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeQueryStatement(request)
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+        Session.verify_success(resp.status)
         return SessionDataSet(
             sql,
             resp.columns,
@@ -949,12 +1167,22 @@ class Session(object):
         try:
             resp = self.__client.executeUpdateStatement(request)
             status = resp.status
-            logger.debug(
-                "execute non-query statement {} message: {}".format(sql, status.message)
-            )
             return Session.verify_success(status)
         except TTransport.TException as e:
-            raise RuntimeError("execution of non-query statement fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeUpdateStatement(request)
+                    status = resp.status
+                    return Session.verify_success(status)
+                except TTransport.TException as e1:
+                    logger.exception(
+                        "execution of non-query statement fails because: ", e1
+                    )
+                    raise e1
+            else:
+                raise e
 
     @staticmethod
     def value_to_bytes(data_types, values):
@@ -1075,7 +1303,20 @@ class Session(object):
             statementId=self.__statement_id,
             enableRedirectQuery=False,
         )
-        resp = self.__client.executeRawDataQuery(request)
+        try:
+            resp = self.__client.executeRawDataQuery(request)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeRawDataQuery(request)
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+        Session.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1104,8 +1345,20 @@ class Session(object):
             self.__statement_id,
             enableRedirectQuery=False,
         )
-
-        resp = self.__client.executeLastDataQuery(request)
+        try:
+            resp = self.__client.executeLastDataQuery(request)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeLastDataQuery(request)
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+        Session.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1118,3 +1371,29 @@ class Session(object):
             resp.queryDataSet,
             resp.ignoreTimeStamp,
         )
+
+    def reconnect(self):
+        if self.__hosts is None:
+            return False
+        connected = False
+        for i in range(1, self.RETRY_NUM + 1):
+            if self.__transport is not None:
+                self.__transport.close()
+                curr_host_index = random.randint(0, len(self.__hosts))
+                try_host_num = 0
+                for j in range(curr_host_index, len(self.__hosts)):
+                    if try_host_num == len(self.__hosts):
+                        break
+                    self.__default_endpoint = EndPoint(self.__hosts[j], self.__ports[j])
+                    if j == len(self.__hosts) - 1:
+                        j = -1
+                    try_host_num += 1
+                    try:
+                        self.init_connection(self.__default_endpoint)
+                        connected = True
+                    except TTransport.TException as e:
+                        continue
+                    break
+            if connected:
+                break
+        return connected
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index f957c2da42..05aa8eb141 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -67,11 +67,12 @@ def test_non_time_query():
         "tags",
         "attributes",
         "deadband",
-        "deadband parameters"
+        "deadband parameters",
     ]
     assert_array_equal(
         df.values,
-        [[
+        [
+            [
                 "root.device1.pressure",
                 None,
                 "root.device1",
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 7e9df9ac7e..b0a9f014c2 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -114,7 +114,9 @@ def test_delete_date():
             print_message("insert aligned record of one device failed")
 
         # execute delete data
-        session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+        session.delete_data(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_query_statement(
@@ -140,7 +142,9 @@ def test_delete_date():
         assert actual_count == expect_count
 
         # execute delete data
-        session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+        session.delete_data_in_range(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_query_statement(
@@ -175,4 +179,3 @@ else:
     print("Some test failed, please have a check")
     print("failed count: ", failed_count)
     exit(1)
-
diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md
index 4c4ee5e9c6..99ac94cfe6 100644
--- a/docs/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/UserGuide/API/Programming-Python-Native-API.md
@@ -59,7 +59,26 @@ session.close()
 * Initialize a Session
 
 ```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+    ip="127.0.0.1",
+    port="6667",
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
+```
+
+* Initialize a Session to connect multiple nodes
+
+```python
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
 ```
 
 * Open a session, with a parameter to specify whether to enable RPC compression
diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
index 5038e023b1..7df7994abf 100644
--- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
@@ -60,7 +60,26 @@ session.close()
 * 初始化 Session
 
 ```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+    ip="127.0.0.1",
+    port="6667",
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
+```
+
+* 初始化可连接多节点的 Session
+
+```python
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
 ```
 
 * 开启 Session,并决定是否开启 RPC 压缩