You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/22 13:51:25 UTC

[iotdb] branch multi_node_13 created (now aa08d878a0)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch multi_node_13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at aa08d878a0 [IOTDB-5711] Support connecting multiple nodes in Python API (#9400)

This branch includes the following new commits:

     new aa08d878a0 [IOTDB-5711] Support connecting multiple nodes in Python API (#9400)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5711] Support connecting multiple nodes in Python API (#9400)

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch multi_node_13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa08d878a06d449a542abce3ca0613a54cb19170
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Mar 22 09:51:58 2023 +0800

    [IOTDB-5711] Support connecting multiple nodes in Python API (#9400)
---
 client-py/SessionExample.py                        |  10 +-
 client-py/iotdb/Session.py                         | 812 ++++++++++++++++-----
 client-py/tests/test_dataframe.py                  |   5 +-
 client-py/tests/test_delete_data.py                |   9 +-
 .../UserGuide/API/Programming-Python-Native-API.md |  21 +-
 .../UserGuide/API/Programming-Python-Native-API.md |  21 +-
 6 files changed, 672 insertions(+), 206 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index bbc9669527..6dda1c49e0 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -29,7 +29,14 @@ ip = "127.0.0.1"
 port_ = "6667"
 username_ = "root"
 password_ = "root"
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8",
+)
 session.open(False)
 
 # set and delete storage groups
@@ -215,6 +222,7 @@ np_tablet_unsorted = NumpyTablet(
     np_values_unsorted,
     np_timestamps_unsorted,
 )
+
 session.insert_tablet(np_tablet_unsorted)
 print(np_tablet_unsorted.get_timestamps())
 for value in np_tablet_unsorted.get_values():
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 145fd94370..cf6e8ebc22 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,14 +16,21 @@
 # under the License.
 #
 import logging
+import random
 import struct
 import time
 
-from iotdb.utils.SessionDataSet import SessionDataSet
 
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
+from iotdb.utils.SessionDataSet import SessionDataSet
+from .thrift.rpc.ttypes import (
+    EndPoint,
+    TSRawDataQueryReq,
+    TSLastDataQueryReq,
+    TSInsertStringRecordsOfOneDeviceReq,
+)
 from .thrift.rpc.TSIService import (
     Client,
     TSCreateTimeseriesReq,
@@ -34,7 +41,6 @@ from .thrift.rpc.TSIService import (
     TSExecuteStatementReq,
     TSOpenSessionReq,
     TSCreateMultiTimeseriesReq,
-    TSCreateSchemaTemplateReq,
     TSCloseSessionReq,
     TSInsertTabletsReq,
     TSInsertRecordsReq,
@@ -65,6 +71,7 @@ class Session(object):
     DEFAULT_USER = "root"
     DEFAULT_PASSWORD = "root"
     DEFAULT_ZONE_ID = time.strftime("%z")
+    RETRY_NUM = 3
 
     def __init__(
         self,
@@ -77,6 +84,9 @@ class Session(object):
     ):
         self.__host = host
         self.__port = port
+        self.__hosts = None
+        self.__ports = None
+        self.__default_endpoint = EndPoint(self.__host, self.__port)
         self.__user = user
         self.__password = password
         self.__fetch_size = fetch_size
@@ -87,21 +97,60 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__enable_rpc_compression = None
+
+    @classmethod
+    def init_from_node_urls(
+        cls,
+        node_urls,
+        user=DEFAULT_USER,
+        password=DEFAULT_PASSWORD,
+        fetch_size=DEFAULT_FETCH_SIZE,
+        zone_id=DEFAULT_ZONE_ID,
+    ):
+        if node_urls is None:
+            raise RuntimeError("node urls is empty")
+        session = Session(None, None, user, password, fetch_size, zone_id)
+        session.__hosts = []
+        session.__ports = []
+        for node_url in node_urls:
+            split = node_url.split(":")
+            session.__hosts.append(split[0])
+            session.__ports.append(split[1])
+        session.__host = session.__hosts[0]
+        session.__port = session.__ports[0]
+        session.__default_endpoint = EndPoint(session.__host, session.__port)
+        return session
 
     def open(self, enable_rpc_compression):
         if not self.__is_close:
             return
+        self.__enable_rpc_compression = enable_rpc_compression
+        if self.__hosts is None:
+            self.init_connection(self.__default_endpoint)
+        else:
+            for i in range(0, len(self.__hosts)):
+                self.__default_endpoint = EndPoint(self.__hosts[i], self.__ports[i])
+                try:
+                    self.init_connection(self.__default_endpoint)
+                except Exception as e:
+                    if not self.reconnect():
+                        logger.error("Cluster has no nodes to connect")
+                        raise e
+                break
+
+    def init_connection(self, endpoint):
         self.__transport = TTransport.TFramedTransport(
-            TSocket.TSocket(self.__host, self.__port)
+            TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
         if not self.__transport.isOpen():
             try:
                 self.__transport.open()
             except TTransport.TTransportException as e:
-                logger.exception("TTransportException!", exc_info=e)
+                raise e
 
-        if enable_rpc_compression:
+        if self.__enable_rpc_compression:
             self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
         else:
             self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -133,6 +182,7 @@ class Session(object):
         except Exception as e:
             self.__transport.close()
             logger.exception("session closed because: ", exc_info=e)
+            raise e
 
         if self.__zone_id is not None:
             self.set_time_zone(self.__zone_id)
@@ -165,12 +215,21 @@ class Session(object):
         set one storage group
         :param group_name: String, storage group name (starts from root)
         """
-        status = self.__client.setStorageGroup(self.__session_id, group_name)
-        logger.debug(
-            "setting storage group {} message: {}".format(group_name, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.setStorageGroup(self.__session_id, group_name)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.setStorageGroup(self.__session_id, group_name)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("create databases fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_storage_group(self, storage_group):
         """
@@ -185,14 +244,23 @@ class Session(object):
         delete multiple storage groups.
         :param storage_group_lst: List, paths of the target storage groups.
         """
-        status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
-        logger.debug(
-            "delete storage group(s) {} message: {}".format(
-                storage_group_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteStorageGroups(
+                            self.__session_id, storage_group_lst
+                        )
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("delete database fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_time_series(
         self,
@@ -230,12 +298,20 @@ class Session(object):
             attributes,
             alias,
         )
-        status = self.__client.createTimeseries(request)
-        logger.debug(
-            "creating time series {} message: {}".format(ts_path, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_aligned_time_series(
         self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -260,14 +336,21 @@ class Session(object):
             encoding_lst,
             compressor_lst,
         )
-        status = self.__client.createAlignedTimeseries(request)
-        logger.debug(
-            "creating aligned time series of device {} message: {}".format(
-                measurements_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.createAlignedTimeseries(request)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createAlignedTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            raise e
 
     def create_multi_time_series(
         self,
@@ -306,28 +389,39 @@ class Session(object):
             attributes_lst,
             alias_lst,
         )
-        status = self.__client.createMultiTimeseries(request)
-        logger.debug(
-            "creating multiple time series {} message: {}".format(
-                ts_path_lst, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createMultiTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createMultiTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating multi time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_time_series(self, paths_list):
         """
         delete multiple time series, including data and schema
         :param paths_list: List of time series path, which should be complete (starts from root)
         """
-        status = self.__client.deleteTimeseries(self.__session_id, paths_list)
-        logger.debug(
-            "deleting multiple time series {} message: {}".format(
-                paths_list, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteTimeseries(self.__session_id, paths_list)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("deleting time series fails because: ", e1)
+                    raise e1
 
     def check_time_series_exists(self, path):
         """
@@ -346,14 +440,21 @@ class Session(object):
         :param paths_list: time series list that the data in.
         :param end_time: data with time stamp less than or equal to time will be deleted.
         """
-        request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+        request = TSDeleteDataReq(
+            self.__session_id, paths_list, -9223372036854775808, end_time
+        )
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_data_in_range(self, paths_list, start_time, end_time):
         """
@@ -364,12 +465,17 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_str_record(self, device_id, timestamp, measurements, string_values):
         """special case for inserting one row of String (TEXT) value"""
@@ -377,18 +483,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values
-        )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
+            device_id, timestamp, measurements, string_values
         )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_str_record(
         self, device_id, timestamp, measurements, string_values
@@ -398,18 +509,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values, True
-        )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
+            device_id, timestamp, measurements, string_values, True
         )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
@@ -427,14 +543,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -455,14 +575,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -482,14 +606,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values, True
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -510,14 +638,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -535,14 +667,19 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.testInsertRecord(request)
-        logger.debug(
-            "testing! insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -563,12 +700,19 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.testInsertRecords(request)
-        logger.debug(
-            "testing! insert multiple records, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecords(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_record_req(
         self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -588,11 +732,11 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+        self, device_id, timestamp, measurements, values, is_aligned=False
     ):
-        if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+        if len(values) != len(measurements):
             raise RuntimeError(
-                "length of data types does not equal to length of values!"
+                "length of measurements does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
             self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -649,24 +793,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_tablets(self, tablet_lst):
         """
         insert multiple tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablet(self, tablet):
         """
@@ -680,26 +838,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet, True)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablets(self, tablet_lst):
         """
         insert multiple aligned tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(
-            self.gen_insert_tablets_req(tablet_lst, True)
-        )
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst, True)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -752,12 +922,22 @@ class Session(object):
         request = self.gen_insert_records_of_one_device_request(
             device_id, times_list, measurements_list, values_list, types_list
         )
-
-        # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -811,10 +991,22 @@ class Session(object):
         )
 
         # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_records_of_one_device_request(
         self,
@@ -852,14 +1044,21 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
-        status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "testing! insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablet_req(tablet)
+            return Session.verify_success(self.__client.testInsertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.testInsertTablet(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_tablets(self, tablet_list):
         """
@@ -867,14 +1066,20 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
-        status = self.__client.testInsertTablets(
-            self.gen_insert_tablets_req(tablet_list)
-        )
-        logger.debug(
-            "testing! insert multiple tablets, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablets_req(tablet_list)
+            return Session.verify_success(self.__client.testInsertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertTablets(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -926,19 +1131,43 @@ class Session(object):
         request = TSExecuteStatementReq(
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
-        resp = self.__client.executeQueryStatement(request)
-        return SessionDataSet(
-            sql,
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeQueryStatement(request)
+            return SessionDataSet(
+                sql,
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeQueryStatement(request)
+                    return SessionDataSet(
+                        sql,
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def execute_non_query_statement(self, sql):
         """
@@ -949,12 +1178,22 @@ class Session(object):
         try:
             resp = self.__client.executeUpdateStatement(request)
             status = resp.status
-            logger.debug(
-                "execute non-query statement {} message: {}".format(sql, status.message)
-            )
             return Session.verify_success(status)
         except TTransport.TException as e:
-            raise RuntimeError("execution of non-query statement fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeUpdateStatement(request)
+                    status = resp.status
+                    return Session.verify_success(status)
+                except TTransport.TException as e1:
+                    logger.exception(
+                        "execution of non-query statement fails because: ", e1
+                    )
+                    raise e1
+            else:
+                raise e
 
     @staticmethod
     def value_to_bytes(data_types, values):
@@ -1075,7 +1314,32 @@ class Session(object):
             statementId=self.__statement_id,
             enableRedirectQuery=False,
         )
-        resp = self.__client.executeRawDataQuery(request)
+        try:
+            resp = self.__client.executeRawDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeRawDataQuery(request)
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+        Session.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1104,8 +1368,32 @@ class Session(object):
             self.__statement_id,
             enableRedirectQuery=False,
         )
-
-        resp = self.__client.executeLastDataQuery(request)
+        try:
+            resp = self.__client.executeLastDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeLastDataQuery(request)
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+        Session.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1118,3 +1406,131 @@ class Session(object):
             resp.queryDataSet,
             resp.ignoreTimeStamp,
         )
+
+    def insert_string_records_of_one_device(
+        self,
+        device_id: str,
+        times: list,
+        measurements_list: list,
+        values_list: list,
+        have_sorted: bool = False,
+    ):
+        """
+        insert multiple row of string record into database:
+                 timestamp,     m1,    m2,     m3
+                         0,  text1,  text2, text3
+        :param device_id: String, device id
+        :param times: Timestamp list
+        :param measurements_list: Measurements list
+        :param values_list: Value list
+        :param have_sorted: have these list been sorted by timestamp
+        """
+        if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList and valuesList's size should be equal!"
+            )
+        request = self.gen_insert_string_records_of_one_device_request(
+            device_id, times, measurements_list, values_list, have_sorted, False
+        )
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+
+    def insert_aligned_string_records_of_one_device(
+        self,
+        device_id: str,
+        times: list,
+        measurements_list: list,
+        values: list,
+        have_sorted: bool = False,
+    ):
+        if (len(times) != len(measurements_list)) or (len(times) != len(values)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList and valuesList's size should be equal!"
+            )
+        request = self.gen_insert_string_records_of_one_device_request(
+            device_id, times, measurements_list, values, have_sorted, True
+        )
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
+
+    def reconnect(self):
+        if self.__hosts is None:
+            return False
+        connected = False
+        for i in range(1, self.RETRY_NUM + 1):
+            if self.__transport is not None:
+                self.__transport.close()
+                curr_host_index = random.randint(0, len(self.__hosts))
+                try_host_num = 0
+                for j in range(curr_host_index, len(self.__hosts)):
+                    if try_host_num == len(self.__hosts):
+                        break
+                    self.__default_endpoint = EndPoint(self.__hosts[j], self.__ports[j])
+                    if j == len(self.__hosts) - 1:
+                        j = -1
+                    try_host_num += 1
+                    try:
+                        self.init_connection(self.__default_endpoint)
+                        connected = True
+                    except TTransport.TException as e:
+                        continue
+                    break
+            if connected:
+                break
+        return connected
+
+    def gen_insert_string_records_of_one_device_request(
+        self,
+        device_id,
+        times,
+        measurements_list,
+        values_list,
+        have_sorted,
+        is_aligned=False,
+    ):
+        if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList and valuesList's size should be equal!"
+            )
+        if not Session.check_sorted(times):
+            # sort by timestamp
+            sorted_zipped = sorted(zip(times, measurements_list, values_list))
+            result = zip(*sorted_zipped)
+            times_list, measurements_list, values_list = [list(x) for x in result]
+        request = TSInsertStringRecordsOfOneDeviceReq(
+            self.__session_id,
+            device_id,
+            measurements_list,
+            values_list,
+            times,
+            is_aligned,
+        )
+        return request
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index f957c2da42..05aa8eb141 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -67,11 +67,12 @@ def test_non_time_query():
         "tags",
         "attributes",
         "deadband",
-        "deadband parameters"
+        "deadband parameters",
     ]
     assert_array_equal(
         df.values,
-        [[
+        [
+            [
                 "root.device1.pressure",
                 None,
                 "root.device1",
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 7e9df9ac7e..b0a9f014c2 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -114,7 +114,9 @@ def test_delete_date():
             print_message("insert aligned record of one device failed")
 
         # execute delete data
-        session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+        session.delete_data(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_query_statement(
@@ -140,7 +142,9 @@ def test_delete_date():
         assert actual_count == expect_count
 
         # execute delete data
-        session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+        session.delete_data_in_range(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_query_statement(
@@ -175,4 +179,3 @@ else:
     print("Some test failed, please have a check")
     print("failed count: ", failed_count)
     exit(1)
-
diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md
index 4c4ee5e9c6..99ac94cfe6 100644
--- a/docs/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/UserGuide/API/Programming-Python-Native-API.md
@@ -59,7 +59,26 @@ session.close()
 * Initialize a Session
 
 ```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+    ip="127.0.0.1",
+    port="6667",
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
+```
+
+* Initialize a Session to connect multiple nodes
+
+```python
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
 ```
 
 * Open a session, with a parameter to specify whether to enable RPC compression
diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
index 5038e023b1..7df7994abf 100644
--- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
@@ -60,7 +60,26 @@ session.close()
 * 初始化 Session
 
 ```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+    ip="127.0.0.1",
+    port="6667",
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
+```
+
+* 初始化可连接多节点的 Session
+
+```python
+session = Session.init_from_node_urls(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+    user="root",
+    password="root",
+    fetch_size=1024,
+    zone_id="UTC+8"
+)
 ```
 
 * 开启 Session,并决定是否开启 RPC 压缩