You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/21 10:27:08 UTC

[iotdb] 01/01: Support connecting multiple nodes in python api

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch py_reconnect
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ed95d9efe8dfff8fd7768ca13ec31d9849a20e1
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 21 18:24:39 2023 +0800

    Support connecting multiple nodes in python api
---
 client-py/SessionExample.py                      |   7 +-
 client-py/iotdb/Session.py                       | 800 ++++++++++++++++-------
 client-py/iotdb/utils/IoTDBRpcDataSet.py         |   6 +-
 client-py/iotdb/utils/NumpyTablet.py             |   4 +-
 client-py/tests/tablet_performance_comparison.py |   4 +-
 client-py/tests/test_dataframe.py                |   6 +-
 client-py/tests/test_delete_data.py              |   8 +-
 client-py/tests/test_numpy_tablet.py             |   7 +-
 client-py/tests/test_session.py                  |   7 +-
 client-py/tests/test_template.py                 |  12 +-
 10 files changed, 608 insertions(+), 253 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 1323bf4ef1..5e24e3c41d 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -239,7 +239,12 @@ np_bitmaps_[2].mark(2)
 np_bitmaps_[4].mark(3)
 np_bitmaps_[5].mark(3)
 np_tablet_with_none = NumpyTablet(
-    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+    "root.sg_test_01.d_02",
+    measurements_,
+    data_types_,
+    np_values_,
+    np_timestamps_,
+    np_bitmaps_,
 )
 session.insert_tablet(np_tablet_with_none)
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 1a4c41326d..56359ae3fd 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,6 +16,7 @@
 # under the License.
 #
 import logging
+import random
 import struct
 import time
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
@@ -24,6 +25,7 @@ from thrift.transport import TSocket, TTransport
 from iotdb.utils.SessionDataSet import SessionDataSet
 from .template.Template import Template
 from .template.TemplateQueryType import TemplateQueryType
+from .thrift.common.ttypes import TEndPoint
 from .thrift.rpc.IClientRPCService import (
     Client,
     TSCreateTimeseriesReq,
@@ -54,6 +56,7 @@ from .thrift.rpc.ttypes import (
     TSLastDataQueryReq,
     TSInsertStringRecordsOfOneDeviceReq,
 )
+
 # for debug
 # from IoTDBConstants import *
 # from SessionDataSet import SessionDataSet
@@ -72,23 +75,34 @@ logger = logging.getLogger("IoTDB")
 
 class Session(object):
     SUCCESS_STATUS = 200
+    MULTIPLE_ERROR = 302
     REDIRECTION_RECOMMEND = 400
     DEFAULT_FETCH_SIZE = 10000
     DEFAULT_USER = "root"
     DEFAULT_PASSWORD = "root"
     DEFAULT_ZONE_ID = time.strftime("%z")
+    RETRY_NUM = 3
 
     def __init__(
         self,
-        host,
-        port,
+        hosts,
+        ports,
         user=DEFAULT_USER,
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
     ):
-        self.__host = host
-        self.__port = port
+        if isinstance(hosts, list):
+            self.__hosts = hosts
+            self.__host = hosts[0]
+        else:
+            self.__host = hosts
+        if isinstance(ports, list):
+            self.__ports = ports
+            self.__port = ports[0]
+        else:
+            self.__port = ports
+        self.__default_endpoint = TEndPoint(self.__host, self.__port)
         self.__user = user
         self.__password = password
         self.__fetch_size = fetch_size
@@ -99,12 +113,17 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__enable_rpc_compression = None
 
     def open(self, enable_rpc_compression=False):
+        self.__enable_rpc_compression = enable_rpc_compression
+        self.init(self.__default_endpoint)
+
+    def init(self, endpoint):
         if not self.__is_close:
             return
         self.__transport = TTransport.TFramedTransport(
-            TSocket.TSocket(self.__host, self.__port)
+            TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
         if not self.__transport.isOpen():
@@ -112,8 +131,9 @@ class Session(object):
                 self.__transport.open()
             except TTransport.TTransportException as e:
                 logger.exception("TTransportException!", exc_info=e)
+                raise e
 
-        if enable_rpc_compression:
+        if self.__enable_rpc_compression:
             self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
         else:
             self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -145,6 +165,7 @@ class Session(object):
         except Exception as e:
             self.__transport.close()
             logger.exception("session closed because: ", exc_info=e)
+            raise e
 
         if self.__zone_id is not None:
             self.set_time_zone(self.__zone_id)
@@ -177,12 +198,21 @@ class Session(object):
         create one database
         :param group_name: String, database name (starts from root)
         """
-        status = self.__client.setStorageGroup(self.__session_id, group_name)
-        logger.debug(
-            "setting database {} message: {}".format(group_name, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.setStorageGroup(self.__session_id, group_name)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.setStorageGroup(self.__session_id, group_name)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("create databases fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_storage_group(self, storage_group):
         """
@@ -197,14 +227,23 @@ class Session(object):
         delete multiple databases.
         :param storage_group_lst: List, paths of the target databases.
         """
-        status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
-        logger.debug(
-            "delete database(s) {} message: {}".format(
-                storage_group_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteStorageGroups(
+                            self.__session_id, storage_group_lst
+                        )
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("delete database fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_time_series(
         self,
@@ -242,12 +281,20 @@ class Session(object):
             attributes,
             alias,
         )
-        status = self.__client.createTimeseries(request)
-        logger.debug(
-            "creating time series {} message: {}".format(ts_path, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_aligned_time_series(
         self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -272,14 +319,21 @@ class Session(object):
             encoding_lst,
             compressor_lst,
         )
-        status = self.__client.createAlignedTimeseries(request)
-        logger.debug(
-            "creating aligned time series of device {} message: {}".format(
-                measurements_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.createAlignedTimeseries(request)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createAlignedTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            raise e
 
     def create_multi_time_series(
         self,
@@ -318,28 +372,39 @@ class Session(object):
             attributes_lst,
             alias_lst,
         )
-        status = self.__client.createMultiTimeseries(request)
-        logger.debug(
-            "creating multiple time series {} message: {}".format(
-                ts_path_lst, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createMultiTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createMultiTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating multi time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_time_series(self, paths_list):
         """
         delete multiple time series, including data and schema
         :param paths_list: List of time series path, which should be complete (starts from root)
         """
-        status = self.__client.deleteTimeseries(self.__session_id, paths_list)
-        logger.debug(
-            "deleting multiple time series {} message: {}".format(
-                paths_list, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteTimeseries(self.__session_id, paths_list)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("deleting time series fails because: ", e1)
+                    raise e1
 
     def check_time_series_exists(self, path):
         """
@@ -358,14 +423,21 @@ class Session(object):
         :param paths_list: time series list that the data in.
         :param end_time: data with time stamp less than or equal to time will be deleted.
         """
-        request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+        request = TSDeleteDataReq(
+            self.__session_id, paths_list, -9223372036854775808, end_time
+        )
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_data_in_range(self, paths_list, start_time, end_time):
         """
@@ -376,12 +448,17 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_str_record(self, device_id, timestamp, measurements, string_values):
         """special case for inserting one row of String (TEXT) value"""
@@ -389,18 +466,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values
+            device_id, timestamp, measurements, string_values
         )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_str_record(
         self, device_id, timestamp, measurements, string_values
@@ -410,18 +492,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values, True
-        )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
+            device_id, timestamp, measurements, string_values, True
         )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
@@ -439,14 +526,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -467,14 +558,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -494,14 +589,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values, True
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -522,14 +621,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -547,14 +650,19 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.testInsertRecord(request)
-        logger.debug(
-            "testing! insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -575,12 +683,19 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.testInsertRecords(request)
-        logger.debug(
-            "testing! insert multiple records, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecords(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_record_req(
         self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -600,11 +715,11 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+        self, device_id, timestamp, measurements, values, is_aligned=False
     ):
-        if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+        if len(values) != len(measurements):
             raise RuntimeError(
-                "length of data types does not equal to length of values!"
+                "length of measurements does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
             self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -661,24 +776,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_tablets(self, tablet_lst):
         """
         insert multiple tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablet(self, tablet):
         """
@@ -692,26 +821,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet, True)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablets(self, tablet_lst):
         """
         insert multiple aligned tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(
-            self.gen_insert_tablets_req(tablet_lst, True)
-        )
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst, True)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -764,12 +905,22 @@ class Session(object):
         request = self.gen_insert_records_of_one_device_request(
             device_id, times_list, measurements_list, values_list, types_list
         )
-
-        # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -823,10 +974,22 @@ class Session(object):
         )
 
         # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_records_of_one_device_request(
         self,
@@ -864,14 +1027,21 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
-        status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "testing! insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablet_req(tablet)
+            return Session.verify_success(self.__client.testInsertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.testInsertTablet(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_tablets(self, tablet_list):
         """
@@ -879,14 +1049,20 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
-        status = self.__client.testInsertTablets(
-            self.gen_insert_tablets_req(tablet_list)
-        )
-        logger.debug(
-            "testing! insert multiple tablets, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablets_req(tablet_list)
+            return Session.verify_success(self.__client.testInsertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertTablets(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -938,19 +1114,43 @@ class Session(object):
         request = TSExecuteStatementReq(
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
-        resp = self.__client.executeQueryStatement(request)
-        return SessionDataSet(
-            sql,
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeQueryStatement(request)
+            return SessionDataSet(
+                sql,
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeQueryStatement(request)
+                    return SessionDataSet(
+                        sql,
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def execute_non_query_statement(self, sql):
         """
@@ -961,12 +1161,22 @@ class Session(object):
         try:
             resp = self.__client.executeUpdateStatement(request)
             status = resp.status
-            logger.debug(
-                "execute non-query statement {} message: {}".format(sql, status.message)
-            )
             return Session.verify_success(status)
         except TTransport.TException as e:
-            raise RuntimeError("execution of non-query statement fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeUpdateStatement(request)
+                    status = resp.status
+                    return Session.verify_success(status)
+                except TTransport.TException as e1:
+                    logger.exception(
+                        "execution of non-query statement fails because: ", e1
+                    )
+                    raise e1
+            else:
+                raise e
 
     @staticmethod
     def value_to_bytes(data_types, values):
@@ -1050,13 +1260,32 @@ class Session(object):
         verify success of operation
         :param status: execution result status
         """
-        if status.code == Session.SUCCESS_STATUS or status.code == Session.REDIRECTION_RECOMMEND:
+        if status.code == Session.MULTIPLE_ERROR:
+            Session.verify_success_by_list(status.subStatus)
+            return 0
+        if (
+            status.code == Session.SUCCESS_STATUS
+            or status.code == Session.REDIRECTION_RECOMMEND
+        ):
             return 0
 
         logger.error("error status is %s", status)
-        raise RuntimeError(
-            "execution of statement fails because: " + status.message
-        )
+        raise RuntimeError(status.code + ": " + status.message)
+
+    @staticmethod
+    def verify_success_by_list(status_list):
+        """
+        verify success of operation
+        :param status_list: execution result status
+        """
+        message = str(Session.MULTIPLE_ERROR) + ": "
+        for status in status_list:
+            if (
+                status.code != Session.SUCCESS_STATUS
+                and status.code != Session.REDIRECTION_RECOMMEND
+            ):
+                message += status.message + "; "
+        raise RuntimeError(message)
 
     def execute_raw_data_query(
         self, paths: list, start_time: int, end_time: int
@@ -1077,19 +1306,43 @@ class Session(object):
             statementId=self.__statement_id,
             enableRedirectQuery=False,
         )
-        resp = self.__client.executeRawDataQuery(request)
-        return SessionDataSet(
-            "",
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeRawDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeRawDataQuery(request)
+                    return SessionDataSet(
+                        "",
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet:
         """
@@ -1106,20 +1359,43 @@ class Session(object):
             self.__statement_id,
             enableRedirectQuery=False,
         )
-
-        resp = self.__client.executeLastDataQuery(request)
-        return SessionDataSet(
-            "",
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeLastDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeLastDataQuery(request)
+                    return SessionDataSet(
+                        "",
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_string_records_of_one_device(
         self,
@@ -1146,12 +1422,22 @@ class Session(object):
         request = self.gen_insert_string_records_of_one_device_request(
             device_id, times, measurements_list, values_list, have_sorted, False
         )
-        status = self.__client.insertStringRecordsOfOneDevice(request)
-        logger.debug(
-            "insert one device {} message: {}".format(device_id, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_string_records_of_one_device(
         self,
@@ -1168,12 +1454,50 @@ class Session(object):
         request = self.gen_insert_string_records_of_one_device_request(
             device_id, times, measurements_list, values, have_sorted, True
         )
-        status = self.__client.insertStringRecordsOfOneDevice(request)
-        logger.debug(
-            "insert one device {} message: {}".format(device_id, status.message)
-        )
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
-        return Session.verify_success(status)
+    def reconnect(self):
+        if self.__hosts is None:
+            return False
+        connected = False
+        for i in range(1, self.RETRY_NUM):
+            if self.__transport is not None:
+                self.__transport.close()
+                curr_host_index = random.randint(0, len(self.__hosts))
+                try_host_num = 0
+                for j in range(curr_host_index, len(self.__hosts)):
+                    if try_host_num == len(self.__hosts):
+                        break
+                    self.__default_endpoint = TEndPoint(
+                        self.__hosts[j], self.__ports[j]
+                    )
+                    if j == len(self.__hosts) - 1:
+                        j = -1
+                    try_host_num += 1
+                    try:
+                        self.init(self.__default_endpoint)
+                        connected = True
+                    except TTransport.TException as e:
+                        continue
+                    break
+            if connected:
+                break
+        return connected
 
     def gen_insert_string_records_of_one_device_request(
         self,
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index e9dfa7bfe6..6d31629340 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -243,7 +243,9 @@ class IoTDBRpcDataSet(object):
                 if len(data_array) < total_length:
                     if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
                         tmp_array = np.full(total_length, np.nan, np.float32)
-                    elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
+                    elif (
+                        data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE
+                    ):
                         tmp_array = np.full(total_length, np.nan, data_array.dtype)
                     elif data_type == TSDataType.BOOLEAN:
                         tmp_array = np.full(total_length, np.nan, np.float32)
@@ -252,7 +254,7 @@ class IoTDBRpcDataSet(object):
 
                     bitmap_buffer = self.__query_data_set.bitmapList[location]
                     bitmap_str = self._to_bitstring(bitmap_buffer)
-                    bit_mask = (np.fromstring(bitmap_str, 'u1') - ord('0')).astype(bool)
+                    bit_mask = (np.fromstring(bitmap_str, "u1") - ord("0")).astype(bool)
                     if len(bit_mask) != total_length:
                         bit_mask = bit_mask[:total_length]
                     tmp_array[bit_mask] = data_array
diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py
index 7315b872e5..dbb0d354db 100644
--- a/client-py/iotdb/utils/NumpyTablet.py
+++ b/client-py/iotdb/utils/NumpyTablet.py
@@ -22,7 +22,9 @@ from iotdb.utils.BitMap import BitMap
 
 
 class NumpyTablet(object):
-    def __init__(self, device_id, measurements, data_types, values, timestamps, bitmaps=None):
+    def __init__(
+        self, device_id, measurements, data_types, values, timestamps, bitmaps=None
+    ):
         """
         creating a numpy tablet for insertion
           for example, considering device: root.sg1.d1
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
index ef5847140c..3626e818a8 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -75,9 +75,9 @@ def generate_csv_data(
         if _type == TSDataType.BOOLEAN:
             return [random.randint(0, 1) == 1 for _ in range(_row)]
         elif _type == TSDataType.INT32:
-            return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)]
+            return [random.randint(-(2**31), 2**31) for _ in range(_row)]
         elif _type == TSDataType.INT64:
-            return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)]
+            return [random.randint(-(2**63), 2**63) for _ in range(_row)]
         elif _type == TSDataType.FLOAT:
             return [1.5 for _ in range(_row)]
         elif _type == TSDataType.DOUBLE:
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index e7492a3b14..801b7c1bcd 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -53,7 +53,9 @@ def test_non_time_query():
         session.insert_str_record("root.device0", 123, "pressure", "15.0")
 
         # Read
-        session_data_set = session.execute_query_statement("SHOW TIMESERIES root.device0.*")
+        session_data_set = session.execute_query_statement(
+            "SHOW TIMESERIES root.device0.*"
+        )
         df = session_data_set.todf()
 
         session.close()
@@ -68,7 +70,7 @@ def test_non_time_query():
         "Tags",
         "Attributes",
         "Deadband",
-        "DeadbandParameters"
+        "DeadbandParameters",
     ]
     assert_array_equal(
         df.values,
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 031e8ef6e6..8f44574726 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -102,7 +102,9 @@ def test_delete_date():
             print_message("insert aligned record of one device failed")
 
         # execute delete data
-        session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+        session.delete_data(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_raw_data_query(
@@ -127,7 +129,9 @@ def test_delete_date():
         assert actual_count == expect_count
 
         # execute delete data
-        session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+        session.delete_data_in_range(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_raw_data_query(
diff --git a/client-py/tests/test_numpy_tablet.py b/client-py/tests/test_numpy_tablet.py
index ddcb17b070..217df22ea2 100644
--- a/client-py/tests/test_numpy_tablet.py
+++ b/client-py/tests/test_numpy_tablet.py
@@ -100,7 +100,12 @@ def test_numpy_tablet_with_none_serialization():
     np_bitmaps_[4].mark(3)
     np_bitmaps_[5].mark(3)
     np_tablet_ = NumpyTablet(
-        "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+        "root.sg_test_01.d_01",
+        measurements_,
+        data_types_,
+        np_values_,
+        np_timestamps_,
+        np_bitmaps_,
     )
     assert tablet_.get_binary_timestamps() == np_tablet_.get_binary_timestamps()
     assert tablet_.get_binary_values() == np_tablet_.get_binary_values()
diff --git a/client-py/tests/test_session.py b/client-py/tests/test_session.py
index 6fbddb9820..8a466b8717 100644
--- a/client-py/tests/test_session.py
+++ b/client-py/tests/test_session.py
@@ -311,7 +311,12 @@ def test_session():
         np_bitmaps_[4].mark(3)
         np_bitmaps_[5].mark(3)
         np_tablet_ = NumpyTablet(
-            "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+            "root.sg_test_01.d_01",
+            measurements_,
+            data_types_,
+            np_values_,
+            np_timestamps_,
+            np_bitmaps_,
         )
         if session.insert_tablet(np_tablet_) < 0:
             test_fail()
diff --git a/client-py/tests/test_template.py b/client-py/tests/test_template.py
index e91a8a0990..93cfca40cf 100644
--- a/client-py/tests/test_template.py
+++ b/client-py/tests/test_template.py
@@ -92,12 +92,18 @@ def test_set_template():
         session.execute_non_query_statement("CREATE DATABASE root.python")
 
         session.set_schema_template(template_name, "root.python.GPS")
-        session.execute_non_query_statement("create timeseries of schema template on root.python.GPS")
+        session.execute_non_query_statement(
+            "create timeseries of schema template on root.python.GPS"
+        )
 
         assert session.show_paths_template_set_on(template_name) == ["root.python.GPS"]
-        assert session.show_paths_template_using_on(template_name) == ["root.python.GPS"]
+        assert session.show_paths_template_using_on(template_name) == [
+            "root.python.GPS"
+        ]
 
-        session.execute_non_query_statement("delete timeseries of schema template from root.python.GPS")
+        session.execute_non_query_statement(
+            "delete timeseries of schema template from root.python.GPS"
+        )
 
         session.unset_schema_template(template_name, "root.python.GPS")
         session.drop_schema_template(template_name)