You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/21 10:27:07 UTC

[iotdb] branch py_reconnect created (now 0ed95d9efe)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch py_reconnect
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 0ed95d9efe Support connecting multiple nodes in python api

This branch includes the following new commits:

     new 0ed95d9efe Support connecting multiple nodes in python api

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Support connecting multiple nodes in python api

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch py_reconnect
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ed95d9efe8dfff8fd7768ca13ec31d9849a20e1
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 21 18:24:39 2023 +0800

    Support connecting multiple nodes in python api
---
 client-py/SessionExample.py                      |   7 +-
 client-py/iotdb/Session.py                       | 800 ++++++++++++++++-------
 client-py/iotdb/utils/IoTDBRpcDataSet.py         |   6 +-
 client-py/iotdb/utils/NumpyTablet.py             |   4 +-
 client-py/tests/tablet_performance_comparison.py |   4 +-
 client-py/tests/test_dataframe.py                |   6 +-
 client-py/tests/test_delete_data.py              |   8 +-
 client-py/tests/test_numpy_tablet.py             |   7 +-
 client-py/tests/test_session.py                  |   7 +-
 client-py/tests/test_template.py                 |  12 +-
 10 files changed, 608 insertions(+), 253 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 1323bf4ef1..5e24e3c41d 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -239,7 +239,12 @@ np_bitmaps_[2].mark(2)
 np_bitmaps_[4].mark(3)
 np_bitmaps_[5].mark(3)
 np_tablet_with_none = NumpyTablet(
-    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+    "root.sg_test_01.d_02",
+    measurements_,
+    data_types_,
+    np_values_,
+    np_timestamps_,
+    np_bitmaps_,
 )
 session.insert_tablet(np_tablet_with_none)
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 1a4c41326d..56359ae3fd 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,6 +16,7 @@
 # under the License.
 #
 import logging
+import random
 import struct
 import time
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
@@ -24,6 +25,7 @@ from thrift.transport import TSocket, TTransport
 from iotdb.utils.SessionDataSet import SessionDataSet
 from .template.Template import Template
 from .template.TemplateQueryType import TemplateQueryType
+from .thrift.common.ttypes import TEndPoint
 from .thrift.rpc.IClientRPCService import (
     Client,
     TSCreateTimeseriesReq,
@@ -54,6 +56,7 @@ from .thrift.rpc.ttypes import (
     TSLastDataQueryReq,
     TSInsertStringRecordsOfOneDeviceReq,
 )
+
 # for debug
 # from IoTDBConstants import *
 # from SessionDataSet import SessionDataSet
@@ -72,23 +75,34 @@ logger = logging.getLogger("IoTDB")
 
 class Session(object):
     SUCCESS_STATUS = 200
+    MULTIPLE_ERROR = 302
     REDIRECTION_RECOMMEND = 400
     DEFAULT_FETCH_SIZE = 10000
     DEFAULT_USER = "root"
     DEFAULT_PASSWORD = "root"
     DEFAULT_ZONE_ID = time.strftime("%z")
+    RETRY_NUM = 3
 
     def __init__(
         self,
-        host,
-        port,
+        hosts,
+        ports,
         user=DEFAULT_USER,
         password=DEFAULT_PASSWORD,
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
     ):
-        self.__host = host
-        self.__port = port
+        if isinstance(hosts, list):
+            self.__hosts = hosts
+            self.__host = hosts[0]
+        else:
+            self.__host = hosts
+        if isinstance(ports, list):
+            self.__ports = ports
+            self.__port = ports[0]
+        else:
+            self.__port = ports
+        self.__default_endpoint = TEndPoint(self.__host, self.__port)
         self.__user = user
         self.__password = password
         self.__fetch_size = fetch_size
@@ -99,12 +113,17 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__enable_rpc_compression = None
 
     def open(self, enable_rpc_compression=False):
+        self.__enable_rpc_compression = enable_rpc_compression
+        self.init(self.__default_endpoint)
+
+    def init(self, endpoint):
         if not self.__is_close:
             return
         self.__transport = TTransport.TFramedTransport(
-            TSocket.TSocket(self.__host, self.__port)
+            TSocket.TSocket(endpoint.ip, endpoint.port)
         )
 
         if not self.__transport.isOpen():
@@ -112,8 +131,9 @@ class Session(object):
                 self.__transport.open()
             except TTransport.TTransportException as e:
                 logger.exception("TTransportException!", exc_info=e)
+                raise e
 
-        if enable_rpc_compression:
+        if self.__enable_rpc_compression:
             self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
         else:
             self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -145,6 +165,7 @@ class Session(object):
         except Exception as e:
             self.__transport.close()
             logger.exception("session closed because: ", exc_info=e)
+            raise e
 
         if self.__zone_id is not None:
             self.set_time_zone(self.__zone_id)
@@ -177,12 +198,21 @@ class Session(object):
         create one database
         :param group_name: String, database name (starts from root)
         """
-        status = self.__client.setStorageGroup(self.__session_id, group_name)
-        logger.debug(
-            "setting database {} message: {}".format(group_name, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.setStorageGroup(self.__session_id, group_name)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.setStorageGroup(self.__session_id, group_name)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("create databases fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_storage_group(self, storage_group):
         """
@@ -197,14 +227,23 @@ class Session(object):
         delete multiple databases.
         :param storage_group_lst: List, paths of the target databases.
         """
-        status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
-        logger.debug(
-            "delete database(s) {} message: {}".format(
-                storage_group_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteStorageGroups(
+                            self.__session_id, storage_group_lst
+                        )
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("delete database fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_time_series(
         self,
@@ -242,12 +281,20 @@ class Session(object):
             attributes,
             alias,
         )
-        status = self.__client.createTimeseries(request)
-        logger.debug(
-            "creating time series {} message: {}".format(ts_path, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def create_aligned_time_series(
         self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -272,14 +319,21 @@ class Session(object):
             encoding_lst,
             compressor_lst,
         )
-        status = self.__client.createAlignedTimeseries(request)
-        logger.debug(
-            "creating aligned time series of device {} message: {}".format(
-                measurements_lst, status.message
+        try:
+            return Session.verify_success(
+                self.__client.createAlignedTimeseries(request)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createAlignedTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating time series fails because: ", e1)
+                    raise e1
+            raise e
 
     def create_multi_time_series(
         self,
@@ -318,28 +372,39 @@ class Session(object):
             attributes_lst,
             alias_lst,
         )
-        status = self.__client.createMultiTimeseries(request)
-        logger.debug(
-            "creating multiple time series {} message: {}".format(
-                ts_path_lst, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.createMultiTimeseries(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.createMultiTimeseries(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("creating multi time series fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_time_series(self, paths_list):
         """
         delete multiple time series, including data and schema
         :param paths_list: List of time series path, which should be complete (starts from root)
         """
-        status = self.__client.deleteTimeseries(self.__session_id, paths_list)
-        logger.debug(
-            "deleting multiple time series {} message: {}".format(
-                paths_list, status.message
+        try:
+            return Session.verify_success(
+                self.__client.deleteTimeseries(self.__session_id, paths_list)
             )
-        )
-
-        return Session.verify_success(status)
+        except TTransport.TException:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.deleteTimeseries(self.__session_id, paths_list)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("deleting time series fails because: ", e1)
+                    raise e1
 
     def check_time_series_exists(self, path):
         """
@@ -358,14 +423,21 @@ class Session(object):
         :param paths_list: time series list that the data in.
         :param end_time: data with time stamp less than or equal to time will be deleted.
         """
-        request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+        request = TSDeleteDataReq(
+            self.__session_id, paths_list, -9223372036854775808, end_time
+        )
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def delete_data_in_range(self, paths_list, start_time, end_time):
         """
@@ -376,12 +448,17 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
         try:
-            status = self.__client.deleteData(request)
-            logger.debug(
-                "delete data from {}, message: {}".format(paths_list, status.message)
-            )
+            return Session.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
-            logger.exception("data deletion fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.deleteData(request))
+                except TTransport.TException as e1:
+                    logger.exception("data deletion fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_str_record(self, device_id, timestamp, measurements, string_values):
         """special case for inserting one row of String (TEXT) value"""
@@ -389,18 +466,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values
+            device_id, timestamp, measurements, string_values
         )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_str_record(
         self, device_id, timestamp, measurements, string_values
@@ -410,18 +492,23 @@ class Session(object):
             string_values = [string_values]
         if type(measurements) == str:
             measurements = [measurements]
-        data_types = [TSDataType.TEXT.value for _ in string_values]
         request = self.gen_insert_str_record_req(
-            device_id, timestamp, measurements, data_types, string_values, True
-        )
-        status = self.__client.insertStringRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
+            device_id, timestamp, measurements, string_values, True
         )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertStringRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
@@ -439,14 +526,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -467,14 +558,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -494,14 +589,18 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values, True
         )
-        status = self.__client.insertRecord(request)
-        logger.debug(
-            "insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecord(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -522,14 +621,18 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst, True
         )
-        status = self.__client.insertRecords(request)
-        logger.debug(
-            "insert multiple records to devices {} message: {}".format(
-                device_ids, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.insertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertRecords(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -547,14 +650,19 @@ class Session(object):
         request = self.gen_insert_record_req(
             device_id, timestamp, measurements, data_types, values
         )
-        status = self.__client.testInsertRecord(request)
-        logger.debug(
-            "testing! insert one record to device {} message: {}".format(
-                device_id, status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecord(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecord(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_records(
         self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -575,12 +683,19 @@ class Session(object):
         request = self.gen_insert_records_req(
             device_ids, times, measurements_lst, type_values_lst, values_lst
         )
-        status = self.__client.testInsertRecords(request)
-        logger.debug(
-            "testing! insert multiple records, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(self.__client.testInsertRecords(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertRecords(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_record_req(
         self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -600,11 +715,11 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+        self, device_id, timestamp, measurements, values, is_aligned=False
     ):
-        if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+        if len(values) != len(measurements):
             raise RuntimeError(
-                "length of data types does not equal to length of values!"
+                "length of measurements does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
             self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -661,24 +776,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_tablets(self, tablet_lst):
         """
         insert multiple tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablet(self, tablet):
         """
@@ -692,26 +821,38 @@ class Session(object):
                 The tablet itself is sorted (see docs of Tablet.py)
         :param tablet: a tablet specified above
         """
-        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
-        logger.debug(
-            "insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablet_req(tablet, True)
+        try:
+            return Session.verify_success(self.__client.insertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_tablets(self, tablet_lst):
         """
         insert multiple aligned tablets, tablets are independent to each other
         :param tablet_lst: List of tablets
         """
-        status = self.__client.insertTablets(
-            self.gen_insert_tablets_req(tablet_lst, True)
-        )
-        logger.debug("insert multiple tablets, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        request = self.gen_insert_tablets_req(tablet_lst, True)
+        try:
+            return Session.verify_success(self.__client.insertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(self.__client.insertTablets(request))
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -764,12 +905,22 @@ class Session(object):
         request = self.gen_insert_records_of_one_device_request(
             device_id, times_list, measurements_list, values_list, types_list
         )
-
-        # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
@@ -823,10 +974,22 @@ class Session(object):
         )
 
         # send request
-        status = self.__client.insertRecordsOfOneDevice(request)
-        logger.debug("insert records of one device, message: {}".format(status.message))
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_records_of_one_device_request(
         self,
@@ -864,14 +1027,21 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet: a tablet of data
         """
-        status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
-        logger.debug(
-            "testing! insert one tablet to device {} message: {}".format(
-                tablet.get_device_id(), status.message
-            )
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablet_req(tablet)
+            return Session.verify_success(self.__client.testInsertTablet(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.testInsertTablet(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def test_insert_tablets(self, tablet_list):
         """
@@ -879,14 +1049,20 @@ class Session(object):
          should be used to test other time cost in client
         :param tablet_list: List of tablets
         """
-        status = self.__client.testInsertTablets(
-            self.gen_insert_tablets_req(tablet_list)
-        )
-        logger.debug(
-            "testing! insert multiple tablets, message: {}".format(status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            request = self.gen_insert_tablets_req(tablet_list)
+            return Session.verify_success(self.__client.testInsertTablets(request))
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    return Session.verify_success(
+                        self.__client.testInsertTablets(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("test insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -938,19 +1114,43 @@ class Session(object):
         request = TSExecuteStatementReq(
             self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
         )
-        resp = self.__client.executeQueryStatement(request)
-        return SessionDataSet(
-            sql,
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeQueryStatement(request)
+            return SessionDataSet(
+                sql,
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeQueryStatement(request)
+                    return SessionDataSet(
+                        sql,
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def execute_non_query_statement(self, sql):
         """
@@ -961,12 +1161,22 @@ class Session(object):
         try:
             resp = self.__client.executeUpdateStatement(request)
             status = resp.status
-            logger.debug(
-                "execute non-query statement {} message: {}".format(sql, status.message)
-            )
             return Session.verify_success(status)
         except TTransport.TException as e:
-            raise RuntimeError("execution of non-query statement fails because: ", e)
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeUpdateStatement(request)
+                    status = resp.status
+                    return Session.verify_success(status)
+                except TTransport.TException as e1:
+                    logger.exception(
+                        "execution of non-query statement fails because: ", e1
+                    )
+                    raise e1
+            else:
+                raise e
 
     @staticmethod
     def value_to_bytes(data_types, values):
@@ -1050,13 +1260,32 @@ class Session(object):
         verify success of operation
         :param status: execution result status
         """
-        if status.code == Session.SUCCESS_STATUS or status.code == Session.REDIRECTION_RECOMMEND:
+        if status.code == Session.MULTIPLE_ERROR:
+            Session.verify_success_by_list(status.subStatus)
+            return 0
+        if (
+            status.code == Session.SUCCESS_STATUS
+            or status.code == Session.REDIRECTION_RECOMMEND
+        ):
             return 0
 
         logger.error("error status is %s", status)
-        raise RuntimeError(
-            "execution of statement fails because: " + status.message
-        )
+        raise RuntimeError(status.code + ": " + status.message)
+
+    @staticmethod
+    def verify_success_by_list(status_list):
+        """
+        verify success of operation
+        :param status_list: execution result status
+        """
+        message = str(Session.MULTIPLE_ERROR) + ": "
+        for status in status_list:
+            if (
+                status.code != Session.SUCCESS_STATUS
+                and status.code != Session.REDIRECTION_RECOMMEND
+            ):
+                message += status.message + "; "
+        raise RuntimeError(message)
 
     def execute_raw_data_query(
         self, paths: list, start_time: int, end_time: int
@@ -1077,19 +1306,43 @@ class Session(object):
             statementId=self.__statement_id,
             enableRedirectQuery=False,
         )
-        resp = self.__client.executeRawDataQuery(request)
-        return SessionDataSet(
-            "",
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeRawDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeRawDataQuery(request)
+                    return SessionDataSet(
+                        "",
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet:
         """
@@ -1106,20 +1359,43 @@ class Session(object):
             self.__statement_id,
             enableRedirectQuery=False,
         )
-
-        resp = self.__client.executeLastDataQuery(request)
-        return SessionDataSet(
-            "",
-            resp.columns,
-            resp.dataTypeList,
-            resp.columnNameIndexMap,
-            resp.queryId,
-            self.__client,
-            self.__statement_id,
-            self.__session_id,
-            resp.queryDataSet,
-            resp.ignoreTimeStamp,
-        )
+        try:
+            resp = self.__client.executeLastDataQuery(request)
+            return SessionDataSet(
+                "",
+                resp.columns,
+                resp.dataTypeList,
+                resp.columnNameIndexMap,
+                resp.queryId,
+                self.__client,
+                self.__statement_id,
+                self.__session_id,
+                resp.queryDataSet,
+                resp.ignoreTimeStamp,
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    request.statementId = self.__statement_id
+                    resp = self.__client.executeLastDataQuery(request)
+                    return SessionDataSet(
+                        "",
+                        resp.columns,
+                        resp.dataTypeList,
+                        resp.columnNameIndexMap,
+                        resp.queryId,
+                        self.__client,
+                        self.__statement_id,
+                        self.__session_id,
+                        resp.queryDataSet,
+                        resp.ignoreTimeStamp,
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("execution of query statement fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_string_records_of_one_device(
         self,
@@ -1146,12 +1422,22 @@ class Session(object):
         request = self.gen_insert_string_records_of_one_device_request(
             device_id, times, measurements_list, values_list, have_sorted, False
         )
-        status = self.__client.insertStringRecordsOfOneDevice(request)
-        logger.debug(
-            "insert one device {} message: {}".format(device_id, status.message)
-        )
-
-        return Session.verify_success(status)
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
     def insert_aligned_string_records_of_one_device(
         self,
@@ -1168,12 +1454,50 @@ class Session(object):
         request = self.gen_insert_string_records_of_one_device_request(
             device_id, times, measurements_list, values, have_sorted, True
         )
-        status = self.__client.insertStringRecordsOfOneDevice(request)
-        logger.debug(
-            "insert one device {} message: {}".format(device_id, status.message)
-        )
+        try:
+            return Session.verify_success(
+                self.__client.insertStringRecordsOfOneDevice(request)
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return Session.verify_success(
+                        self.__client.insertStringRecordsOfOneDevice(request)
+                    )
+                except TTransport.TException as e1:
+                    logger.exception("insert fails because: ", e1)
+                    raise e1
+            else:
+                raise e
 
-        return Session.verify_success(status)
+    def reconnect(self):
+        if self.__hosts is None:
+            return False
+        connected = False
+        for i in range(1, self.RETRY_NUM):
+            if self.__transport is not None:
+                self.__transport.close()
+                curr_host_index = random.randint(0, len(self.__hosts))
+                try_host_num = 0
+                for j in range(curr_host_index, len(self.__hosts)):
+                    if try_host_num == len(self.__hosts):
+                        break
+                    self.__default_endpoint = TEndPoint(
+                        self.__hosts[j], self.__ports[j]
+                    )
+                    if j == len(self.__hosts) - 1:
+                        j = -1
+                    try_host_num += 1
+                    try:
+                        self.init(self.__default_endpoint)
+                        connected = True
+                    except TTransport.TException as e:
+                        continue
+                    break
+            if connected:
+                break
+        return connected
 
     def gen_insert_string_records_of_one_device_request(
         self,
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index e9dfa7bfe6..6d31629340 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -243,7 +243,9 @@ class IoTDBRpcDataSet(object):
                 if len(data_array) < total_length:
                     if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
                         tmp_array = np.full(total_length, np.nan, np.float32)
-                    elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
+                    elif (
+                        data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE
+                    ):
                         tmp_array = np.full(total_length, np.nan, data_array.dtype)
                     elif data_type == TSDataType.BOOLEAN:
                         tmp_array = np.full(total_length, np.nan, np.float32)
@@ -252,7 +254,7 @@ class IoTDBRpcDataSet(object):
 
                     bitmap_buffer = self.__query_data_set.bitmapList[location]
                     bitmap_str = self._to_bitstring(bitmap_buffer)
-                    bit_mask = (np.fromstring(bitmap_str, 'u1') - ord('0')).astype(bool)
+                    bit_mask = (np.fromstring(bitmap_str, "u1") - ord("0")).astype(bool)
                     if len(bit_mask) != total_length:
                         bit_mask = bit_mask[:total_length]
                     tmp_array[bit_mask] = data_array
diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py
index 7315b872e5..dbb0d354db 100644
--- a/client-py/iotdb/utils/NumpyTablet.py
+++ b/client-py/iotdb/utils/NumpyTablet.py
@@ -22,7 +22,9 @@ from iotdb.utils.BitMap import BitMap
 
 
 class NumpyTablet(object):
-    def __init__(self, device_id, measurements, data_types, values, timestamps, bitmaps=None):
+    def __init__(
+        self, device_id, measurements, data_types, values, timestamps, bitmaps=None
+    ):
         """
         creating a numpy tablet for insertion
           for example, considering device: root.sg1.d1
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
index ef5847140c..3626e818a8 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -75,9 +75,9 @@ def generate_csv_data(
         if _type == TSDataType.BOOLEAN:
             return [random.randint(0, 1) == 1 for _ in range(_row)]
         elif _type == TSDataType.INT32:
-            return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)]
+            return [random.randint(-(2**31), 2**31) for _ in range(_row)]
         elif _type == TSDataType.INT64:
-            return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)]
+            return [random.randint(-(2**63), 2**63) for _ in range(_row)]
         elif _type == TSDataType.FLOAT:
             return [1.5 for _ in range(_row)]
         elif _type == TSDataType.DOUBLE:
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index e7492a3b14..801b7c1bcd 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -53,7 +53,9 @@ def test_non_time_query():
         session.insert_str_record("root.device0", 123, "pressure", "15.0")
 
         # Read
-        session_data_set = session.execute_query_statement("SHOW TIMESERIES root.device0.*")
+        session_data_set = session.execute_query_statement(
+            "SHOW TIMESERIES root.device0.*"
+        )
         df = session_data_set.todf()
 
         session.close()
@@ -68,7 +70,7 @@ def test_non_time_query():
         "Tags",
         "Attributes",
         "Deadband",
-        "DeadbandParameters"
+        "DeadbandParameters",
     ]
     assert_array_equal(
         df.values,
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 031e8ef6e6..8f44574726 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -102,7 +102,9 @@ def test_delete_date():
             print_message("insert aligned record of one device failed")
 
         # execute delete data
-        session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+        session.delete_data(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_raw_data_query(
@@ -127,7 +129,9 @@ def test_delete_date():
         assert actual_count == expect_count
 
         # execute delete data
-        session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+        session.delete_data_in_range(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+        )
 
         # execute raw data query sql statement
         session_data_set = session.execute_raw_data_query(
diff --git a/client-py/tests/test_numpy_tablet.py b/client-py/tests/test_numpy_tablet.py
index ddcb17b070..217df22ea2 100644
--- a/client-py/tests/test_numpy_tablet.py
+++ b/client-py/tests/test_numpy_tablet.py
@@ -100,7 +100,12 @@ def test_numpy_tablet_with_none_serialization():
     np_bitmaps_[4].mark(3)
     np_bitmaps_[5].mark(3)
     np_tablet_ = NumpyTablet(
-        "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+        "root.sg_test_01.d_01",
+        measurements_,
+        data_types_,
+        np_values_,
+        np_timestamps_,
+        np_bitmaps_,
     )
     assert tablet_.get_binary_timestamps() == np_tablet_.get_binary_timestamps()
     assert tablet_.get_binary_values() == np_tablet_.get_binary_values()
diff --git a/client-py/tests/test_session.py b/client-py/tests/test_session.py
index 6fbddb9820..8a466b8717 100644
--- a/client-py/tests/test_session.py
+++ b/client-py/tests/test_session.py
@@ -311,7 +311,12 @@ def test_session():
         np_bitmaps_[4].mark(3)
         np_bitmaps_[5].mark(3)
         np_tablet_ = NumpyTablet(
-            "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+            "root.sg_test_01.d_01",
+            measurements_,
+            data_types_,
+            np_values_,
+            np_timestamps_,
+            np_bitmaps_,
         )
         if session.insert_tablet(np_tablet_) < 0:
             test_fail()
diff --git a/client-py/tests/test_template.py b/client-py/tests/test_template.py
index e91a8a0990..93cfca40cf 100644
--- a/client-py/tests/test_template.py
+++ b/client-py/tests/test_template.py
@@ -92,12 +92,18 @@ def test_set_template():
         session.execute_non_query_statement("CREATE DATABASE root.python")
 
         session.set_schema_template(template_name, "root.python.GPS")
-        session.execute_non_query_statement("create timeseries of schema template on root.python.GPS")
+        session.execute_non_query_statement(
+            "create timeseries of schema template on root.python.GPS"
+        )
 
         assert session.show_paths_template_set_on(template_name) == ["root.python.GPS"]
-        assert session.show_paths_template_using_on(template_name) == ["root.python.GPS"]
+        assert session.show_paths_template_using_on(template_name) == [
+            "root.python.GPS"
+        ]
 
-        session.execute_non_query_statement("delete timeseries of schema template from root.python.GPS")
+        session.execute_non_query_statement(
+            "delete timeseries of schema template from root.python.GPS"
+        )
 
         session.unset_schema_template(template_name, "root.python.GPS")
         session.drop_schema_template(template_name)