You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/21 10:27:08 UTC
[iotdb] 01/01: Support connecting multiple nodes in python api
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch py_reconnect
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ed95d9efe8dfff8fd7768ca13ec31d9849a20e1
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 21 18:24:39 2023 +0800
Support connecting multiple nodes in python api
---
client-py/SessionExample.py | 7 +-
client-py/iotdb/Session.py | 800 ++++++++++++++++-------
client-py/iotdb/utils/IoTDBRpcDataSet.py | 6 +-
client-py/iotdb/utils/NumpyTablet.py | 4 +-
client-py/tests/tablet_performance_comparison.py | 4 +-
client-py/tests/test_dataframe.py | 6 +-
client-py/tests/test_delete_data.py | 8 +-
client-py/tests/test_numpy_tablet.py | 7 +-
client-py/tests/test_session.py | 7 +-
client-py/tests/test_template.py | 12 +-
10 files changed, 608 insertions(+), 253 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 1323bf4ef1..5e24e3c41d 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -239,7 +239,12 @@ np_bitmaps_[2].mark(2)
np_bitmaps_[4].mark(3)
np_bitmaps_[5].mark(3)
np_tablet_with_none = NumpyTablet(
- "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+ "root.sg_test_01.d_02",
+ measurements_,
+ data_types_,
+ np_values_,
+ np_timestamps_,
+ np_bitmaps_,
)
session.insert_tablet(np_tablet_with_none)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 1a4c41326d..56359ae3fd 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,6 +16,7 @@
# under the License.
#
import logging
+import random
import struct
import time
from thrift.protocol import TBinaryProtocol, TCompactProtocol
@@ -24,6 +25,7 @@ from thrift.transport import TSocket, TTransport
from iotdb.utils.SessionDataSet import SessionDataSet
from .template.Template import Template
from .template.TemplateQueryType import TemplateQueryType
+from .thrift.common.ttypes import TEndPoint
from .thrift.rpc.IClientRPCService import (
Client,
TSCreateTimeseriesReq,
@@ -54,6 +56,7 @@ from .thrift.rpc.ttypes import (
TSLastDataQueryReq,
TSInsertStringRecordsOfOneDeviceReq,
)
+
# for debug
# from IoTDBConstants import *
# from SessionDataSet import SessionDataSet
@@ -72,23 +75,34 @@ logger = logging.getLogger("IoTDB")
class Session(object):
SUCCESS_STATUS = 200
+ MULTIPLE_ERROR = 302
REDIRECTION_RECOMMEND = 400
DEFAULT_FETCH_SIZE = 10000
DEFAULT_USER = "root"
DEFAULT_PASSWORD = "root"
DEFAULT_ZONE_ID = time.strftime("%z")
+ RETRY_NUM = 3
def __init__(
self,
- host,
- port,
+ hosts,
+ ports,
user=DEFAULT_USER,
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
):
- self.__host = host
- self.__port = port
+ if isinstance(hosts, list):
+ self.__hosts = hosts
+ self.__host = hosts[0]
+ else:
+ self.__host = hosts
+ if isinstance(ports, list):
+ self.__ports = ports
+ self.__port = ports[0]
+ else:
+ self.__port = ports
+ self.__default_endpoint = TEndPoint(self.__host, self.__port)
self.__user = user
self.__password = password
self.__fetch_size = fetch_size
@@ -99,12 +113,17 @@ class Session(object):
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
+ self.__enable_rpc_compression = None
def open(self, enable_rpc_compression=False):
+ self.__enable_rpc_compression = enable_rpc_compression
+ self.init(self.__default_endpoint)
+
+ def init(self, endpoint):
if not self.__is_close:
return
self.__transport = TTransport.TFramedTransport(
- TSocket.TSocket(self.__host, self.__port)
+ TSocket.TSocket(endpoint.ip, endpoint.port)
)
if not self.__transport.isOpen():
@@ -112,8 +131,9 @@ class Session(object):
self.__transport.open()
except TTransport.TTransportException as e:
logger.exception("TTransportException!", exc_info=e)
+ raise e
- if enable_rpc_compression:
+ if self.__enable_rpc_compression:
self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
else:
self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -145,6 +165,7 @@ class Session(object):
except Exception as e:
self.__transport.close()
logger.exception("session closed because: ", exc_info=e)
+ raise e
if self.__zone_id is not None:
self.set_time_zone(self.__zone_id)
@@ -177,12 +198,21 @@ class Session(object):
create one database
:param group_name: String, database name (starts from root)
"""
- status = self.__client.setStorageGroup(self.__session_id, group_name)
- logger.debug(
- "setting database {} message: {}".format(group_name, status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.setStorageGroup(self.__session_id, group_name)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.setStorageGroup(self.__session_id, group_name)
+ )
+ except TTransport.TException as e1:
+ logger.exception("create databases fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_storage_group(self, storage_group):
"""
@@ -197,14 +227,23 @@ class Session(object):
delete multiple databases.
:param storage_group_lst: List, paths of the target databases.
"""
- status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
- logger.debug(
- "delete database(s) {} message: {}".format(
- storage_group_lst, status.message
+ try:
+ return Session.verify_success(
+ self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.deleteStorageGroups(
+ self.__session_id, storage_group_lst
+ )
+ )
+ except TTransport.TException as e1:
+ logger.exception("delete database fails because: ", e1)
+ raise e1
+ else:
+ raise e
def create_time_series(
self,
@@ -242,12 +281,20 @@ class Session(object):
attributes,
alias,
)
- status = self.__client.createTimeseries(request)
- logger.debug(
- "creating time series {} message: {}".format(ts_path, status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.createTimeseries(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating time series fails because: ", e1)
+ raise e1
+ else:
+ raise e
def create_aligned_time_series(
self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -272,14 +319,21 @@ class Session(object):
encoding_lst,
compressor_lst,
)
- status = self.__client.createAlignedTimeseries(request)
- logger.debug(
- "creating aligned time series of device {} message: {}".format(
- measurements_lst, status.message
+ try:
+ return Session.verify_success(
+ self.__client.createAlignedTimeseries(request)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createAlignedTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating time series fails because: ", e1)
+ raise e1
+ raise e
def create_multi_time_series(
self,
@@ -318,28 +372,39 @@ class Session(object):
attributes_lst,
alias_lst,
)
- status = self.__client.createMultiTimeseries(request)
- logger.debug(
- "creating multiple time series {} message: {}".format(
- ts_path_lst, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.createMultiTimeseries(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createMultiTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating multi time series fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_time_series(self, paths_list):
"""
delete multiple time series, including data and schema
:param paths_list: List of time series path, which should be complete (starts from root)
"""
- status = self.__client.deleteTimeseries(self.__session_id, paths_list)
- logger.debug(
- "deleting multiple time series {} message: {}".format(
- paths_list, status.message
+ try:
+ return Session.verify_success(
+ self.__client.deleteTimeseries(self.__session_id, paths_list)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.deleteTimeseries(self.__session_id, paths_list)
+ )
+ except TTransport.TException as e1:
+ logger.exception("deleting time series fails because: ", e1)
+ raise e1
def check_time_series_exists(self, path):
"""
@@ -358,14 +423,21 @@ class Session(object):
:param paths_list: time series list that the data in.
:param end_time: data with time stamp less than or equal to time will be deleted.
"""
- request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+ request = TSDeleteDataReq(
+ self.__session_id, paths_list, -9223372036854775808, end_time
+ )
try:
- status = self.__client.deleteData(request)
- logger.debug(
- "delete data from {}, message: {}".format(paths_list, status.message)
- )
+ return Session.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
- logger.exception("data deletion fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.deleteData(request))
+ except TTransport.TException as e1:
+ logger.exception("data deletion fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_data_in_range(self, paths_list, start_time, end_time):
"""
@@ -376,12 +448,17 @@ class Session(object):
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
try:
- status = self.__client.deleteData(request)
- logger.debug(
- "delete data from {}, message: {}".format(paths_list, status.message)
- )
+ return Session.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
- logger.exception("data deletion fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.deleteData(request))
+ except TTransport.TException as e1:
+ logger.exception("data deletion fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_str_record(self, device_id, timestamp, measurements, string_values):
"""special case for inserting one row of String (TEXT) value"""
@@ -389,18 +466,23 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
- data_types = [TSDataType.TEXT.value for _ in string_values]
request = self.gen_insert_str_record_req(
- device_id, timestamp, measurements, data_types, string_values
+ device_id, timestamp, measurements, string_values
)
- status = self.__client.insertStringRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertStringRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_str_record(
self, device_id, timestamp, measurements, string_values
@@ -410,18 +492,23 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
- data_types = [TSDataType.TEXT.value for _ in string_values]
request = self.gen_insert_str_record_req(
- device_id, timestamp, measurements, data_types, string_values, True
- )
- status = self.__client.insertStringRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
+ device_id, timestamp, measurements, string_values, True
)
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertStringRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_record(self, device_id, timestamp, measurements, data_types, values):
"""
@@ -439,14 +526,18 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
- status = self.__client.insertRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -467,14 +558,18 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst
)
- status = self.__client.insertRecords(request)
- logger.debug(
- "insert multiple records to devices {} message: {}".format(
- device_ids, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_record(
self, device_id, timestamp, measurements, data_types, values
@@ -494,14 +589,18 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values, True
)
- status = self.__client.insertRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -522,14 +621,18 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst, True
)
- status = self.__client.insertRecords(request)
- logger.debug(
- "insert multiple records to devices {} message: {}".format(
- device_ids, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_record(
self, device_id, timestamp, measurements, data_types, values
@@ -547,14 +650,19 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
- status = self.__client.testInsertRecord(request)
- logger.debug(
- "testing! insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.testInsertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -575,12 +683,19 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst
)
- status = self.__client.testInsertRecords(request)
- logger.debug(
- "testing! insert multiple records, message: {}".format(status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.testInsertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertRecords(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_record_req(
self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -600,11 +715,11 @@ class Session(object):
)
def gen_insert_str_record_req(
- self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+ self, device_id, timestamp, measurements, values, is_aligned=False
):
- if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+ if len(values) != len(measurements):
raise RuntimeError(
- "length of data types does not equal to length of values!"
+ "length of measurements does not equal to length of values!"
)
return TSInsertStringRecordReq(
self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -661,24 +776,38 @@ class Session(object):
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
- status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
- logger.debug(
- "insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablet_req(tablet)
+ try:
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_tablets(self, tablet_lst):
"""
insert multiple tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
- logger.debug("insert multiple tablets, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablets_req(tablet_lst)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_tablet(self, tablet):
"""
@@ -692,26 +821,38 @@ class Session(object):
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
- status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
- logger.debug(
- "insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablet_req(tablet, True)
+ try:
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_tablets(self, tablet_lst):
"""
insert multiple aligned tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- status = self.__client.insertTablets(
- self.gen_insert_tablets_req(tablet_lst, True)
- )
- logger.debug("insert multiple tablets, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablets_req(tablet_lst, True)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
@@ -764,12 +905,22 @@ class Session(object):
request = self.gen_insert_records_of_one_device_request(
device_id, times_list, measurements_list, values_list, types_list
)
-
- # send request
- status = self.__client.insertRecordsOfOneDevice(request)
- logger.debug("insert records of one device, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
@@ -823,10 +974,22 @@ class Session(object):
)
# send request
- status = self.__client.insertRecordsOfOneDevice(request)
- logger.debug("insert records of one device, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_records_of_one_device_request(
self,
@@ -864,14 +1027,21 @@ class Session(object):
should be used to test other time cost in client
:param tablet: a tablet of data
"""
- status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
- logger.debug(
- "testing! insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ request = self.gen_insert_tablet_req(tablet)
+ return Session.verify_success(self.__client.testInsertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.testInsertTablet(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_tablets(self, tablet_list):
"""
@@ -879,14 +1049,20 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
- status = self.__client.testInsertTablets(
- self.gen_insert_tablets_req(tablet_list)
- )
- logger.debug(
- "testing! insert multiple tablets, message: {}".format(status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ request = self.gen_insert_tablets_req(tablet_list)
+ return Session.verify_success(self.__client.testInsertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertTablets(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_tablet_req(self, tablet, is_aligned=False):
data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -938,19 +1114,43 @@ class Session(object):
request = TSExecuteStatementReq(
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
)
- resp = self.__client.executeQueryStatement(request)
- return SessionDataSet(
- sql,
- resp.columns,
- resp.dataTypeList,
- resp.columnNameIndexMap,
- resp.queryId,
- self.__client,
- self.__statement_id,
- self.__session_id,
- resp.queryDataSet,
- resp.ignoreTimeStamp,
- )
+ try:
+ resp = self.__client.executeQueryStatement(request)
+ return SessionDataSet(
+ sql,
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeQueryStatement(request)
+ return SessionDataSet(
+ sql,
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
def execute_non_query_statement(self, sql):
"""
@@ -961,12 +1161,22 @@ class Session(object):
try:
resp = self.__client.executeUpdateStatement(request)
status = resp.status
- logger.debug(
- "execute non-query statement {} message: {}".format(sql, status.message)
- )
return Session.verify_success(status)
except TTransport.TException as e:
- raise RuntimeError("execution of non-query statement fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeUpdateStatement(request)
+ status = resp.status
+ return Session.verify_success(status)
+ except TTransport.TException as e1:
+ logger.exception(
+ "execution of non-query statement fails because: ", e1
+ )
+ raise e1
+ else:
+ raise e
@staticmethod
def value_to_bytes(data_types, values):
@@ -1050,13 +1260,32 @@ class Session(object):
verify success of operation
:param status: execution result status
"""
- if status.code == Session.SUCCESS_STATUS or status.code == Session.REDIRECTION_RECOMMEND:
+ if status.code == Session.MULTIPLE_ERROR:
+ Session.verify_success_by_list(status.subStatus)
+ return 0
+ if (
+ status.code == Session.SUCCESS_STATUS
+ or status.code == Session.REDIRECTION_RECOMMEND
+ ):
return 0
logger.error("error status is %s", status)
- raise RuntimeError(
- "execution of statement fails because: " + status.message
- )
+ raise RuntimeError(status.code + ": " + status.message)
+
+ @staticmethod
+ def verify_success_by_list(status_list):
+ """
+ verify success of operation
+ :param status_list: execution result status
+ """
+ message = str(Session.MULTIPLE_ERROR) + ": "
+ for status in status_list:
+ if (
+ status.code != Session.SUCCESS_STATUS
+ and status.code != Session.REDIRECTION_RECOMMEND
+ ):
+ message += status.message + "; "
+ raise RuntimeError(message)
def execute_raw_data_query(
self, paths: list, start_time: int, end_time: int
@@ -1077,19 +1306,43 @@ class Session(object):
statementId=self.__statement_id,
enableRedirectQuery=False,
)
- resp = self.__client.executeRawDataQuery(request)
- return SessionDataSet(
- "",
- resp.columns,
- resp.dataTypeList,
- resp.columnNameIndexMap,
- resp.queryId,
- self.__client,
- self.__statement_id,
- self.__session_id,
- resp.queryDataSet,
- resp.ignoreTimeStamp,
- )
+ try:
+ resp = self.__client.executeRawDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeRawDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet:
"""
@@ -1106,20 +1359,43 @@ class Session(object):
self.__statement_id,
enableRedirectQuery=False,
)
-
- resp = self.__client.executeLastDataQuery(request)
- return SessionDataSet(
- "",
- resp.columns,
- resp.dataTypeList,
- resp.columnNameIndexMap,
- resp.queryId,
- self.__client,
- self.__statement_id,
- self.__session_id,
- resp.queryDataSet,
- resp.ignoreTimeStamp,
- )
+ try:
+ resp = self.__client.executeLastDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeLastDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_string_records_of_one_device(
self,
@@ -1146,12 +1422,22 @@ class Session(object):
request = self.gen_insert_string_records_of_one_device_request(
device_id, times, measurements_list, values_list, have_sorted, False
)
- status = self.__client.insertStringRecordsOfOneDevice(request)
- logger.debug(
- "insert one device {} message: {}".format(device_id, status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.insertStringRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_string_records_of_one_device(
self,
@@ -1168,12 +1454,50 @@ class Session(object):
request = self.gen_insert_string_records_of_one_device_request(
device_id, times, measurements_list, values, have_sorted, True
)
- status = self.__client.insertStringRecordsOfOneDevice(request)
- logger.debug(
- "insert one device {} message: {}".format(device_id, status.message)
- )
+ try:
+ return Session.verify_success(
+ self.__client.insertStringRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
- return Session.verify_success(status)
+ def reconnect(self):
+ if self.__hosts is None:
+ return False
+ connected = False
+ for i in range(1, self.RETRY_NUM):
+ if self.__transport is not None:
+ self.__transport.close()
+ curr_host_index = random.randint(0, len(self.__hosts))
+ try_host_num = 0
+ for j in range(curr_host_index, len(self.__hosts)):
+ if try_host_num == len(self.__hosts):
+ break
+ self.__default_endpoint = TEndPoint(
+ self.__hosts[j], self.__ports[j]
+ )
+ if j == len(self.__hosts) - 1:
+ j = -1
+ try_host_num += 1
+ try:
+ self.init(self.__default_endpoint)
+ connected = True
+ except TTransport.TException as e:
+ continue
+ break
+ if connected:
+ break
+ return connected
def gen_insert_string_records_of_one_device_request(
self,
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index e9dfa7bfe6..6d31629340 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -243,7 +243,9 @@ class IoTDBRpcDataSet(object):
if len(data_array) < total_length:
if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
tmp_array = np.full(total_length, np.nan, np.float32)
- elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
+ elif (
+ data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE
+ ):
tmp_array = np.full(total_length, np.nan, data_array.dtype)
elif data_type == TSDataType.BOOLEAN:
tmp_array = np.full(total_length, np.nan, np.float32)
@@ -252,7 +254,7 @@ class IoTDBRpcDataSet(object):
bitmap_buffer = self.__query_data_set.bitmapList[location]
bitmap_str = self._to_bitstring(bitmap_buffer)
- bit_mask = (np.fromstring(bitmap_str, 'u1') - ord('0')).astype(bool)
+ bit_mask = (np.fromstring(bitmap_str, "u1") - ord("0")).astype(bool)
if len(bit_mask) != total_length:
bit_mask = bit_mask[:total_length]
tmp_array[bit_mask] = data_array
diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py
index 7315b872e5..dbb0d354db 100644
--- a/client-py/iotdb/utils/NumpyTablet.py
+++ b/client-py/iotdb/utils/NumpyTablet.py
@@ -22,7 +22,9 @@ from iotdb.utils.BitMap import BitMap
class NumpyTablet(object):
- def __init__(self, device_id, measurements, data_types, values, timestamps, bitmaps=None):
+ def __init__(
+ self, device_id, measurements, data_types, values, timestamps, bitmaps=None
+ ):
"""
creating a numpy tablet for insertion
for example, considering device: root.sg1.d1
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
index ef5847140c..3626e818a8 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -75,9 +75,9 @@ def generate_csv_data(
if _type == TSDataType.BOOLEAN:
return [random.randint(0, 1) == 1 for _ in range(_row)]
elif _type == TSDataType.INT32:
- return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)]
+ return [random.randint(-(2**31), 2**31) for _ in range(_row)]
elif _type == TSDataType.INT64:
- return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)]
+ return [random.randint(-(2**63), 2**63) for _ in range(_row)]
elif _type == TSDataType.FLOAT:
return [1.5 for _ in range(_row)]
elif _type == TSDataType.DOUBLE:
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index e7492a3b14..801b7c1bcd 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -53,7 +53,9 @@ def test_non_time_query():
session.insert_str_record("root.device0", 123, "pressure", "15.0")
# Read
- session_data_set = session.execute_query_statement("SHOW TIMESERIES root.device0.*")
+ session_data_set = session.execute_query_statement(
+ "SHOW TIMESERIES root.device0.*"
+ )
df = session_data_set.todf()
session.close()
@@ -68,7 +70,7 @@ def test_non_time_query():
"Tags",
"Attributes",
"Deadband",
- "DeadbandParameters"
+ "DeadbandParameters",
]
assert_array_equal(
df.values,
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 031e8ef6e6..8f44574726 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -102,7 +102,9 @@ def test_delete_date():
print_message("insert aligned record of one device failed")
# execute delete data
- session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+ session.delete_data(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+ )
# execute raw data query sql statement
session_data_set = session.execute_raw_data_query(
@@ -127,7 +129,9 @@ def test_delete_date():
assert actual_count == expect_count
# execute delete data
- session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+ session.delete_data_in_range(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+ )
# execute raw data query sql statement
session_data_set = session.execute_raw_data_query(
diff --git a/client-py/tests/test_numpy_tablet.py b/client-py/tests/test_numpy_tablet.py
index ddcb17b070..217df22ea2 100644
--- a/client-py/tests/test_numpy_tablet.py
+++ b/client-py/tests/test_numpy_tablet.py
@@ -100,7 +100,12 @@ def test_numpy_tablet_with_none_serialization():
np_bitmaps_[4].mark(3)
np_bitmaps_[5].mark(3)
np_tablet_ = NumpyTablet(
- "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+ "root.sg_test_01.d_01",
+ measurements_,
+ data_types_,
+ np_values_,
+ np_timestamps_,
+ np_bitmaps_,
)
assert tablet_.get_binary_timestamps() == np_tablet_.get_binary_timestamps()
assert tablet_.get_binary_values() == np_tablet_.get_binary_values()
diff --git a/client-py/tests/test_session.py b/client-py/tests/test_session.py
index 6fbddb9820..8a466b8717 100644
--- a/client-py/tests/test_session.py
+++ b/client-py/tests/test_session.py
@@ -311,7 +311,12 @@ def test_session():
np_bitmaps_[4].mark(3)
np_bitmaps_[5].mark(3)
np_tablet_ = NumpyTablet(
- "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
+ "root.sg_test_01.d_01",
+ measurements_,
+ data_types_,
+ np_values_,
+ np_timestamps_,
+ np_bitmaps_,
)
if session.insert_tablet(np_tablet_) < 0:
test_fail()
diff --git a/client-py/tests/test_template.py b/client-py/tests/test_template.py
index e91a8a0990..93cfca40cf 100644
--- a/client-py/tests/test_template.py
+++ b/client-py/tests/test_template.py
@@ -92,12 +92,18 @@ def test_set_template():
session.execute_non_query_statement("CREATE DATABASE root.python")
session.set_schema_template(template_name, "root.python.GPS")
- session.execute_non_query_statement("create timeseries of schema template on root.python.GPS")
+ session.execute_non_query_statement(
+ "create timeseries of schema template on root.python.GPS"
+ )
assert session.show_paths_template_set_on(template_name) == ["root.python.GPS"]
- assert session.show_paths_template_using_on(template_name) == ["root.python.GPS"]
+ assert session.show_paths_template_using_on(template_name) == [
+ "root.python.GPS"
+ ]
- session.execute_non_query_statement("delete timeseries of schema template from root.python.GPS")
+ session.execute_non_query_statement(
+ "delete timeseries of schema template from root.python.GPS"
+ )
session.unset_schema_template(template_name, "root.python.GPS")
session.drop_schema_template(template_name)