You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2023/03/22 15:32:21 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 2280534d82 [To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)
2280534d82 is described below
commit 2280534d8289867b37df40617ec5502a4ec1f45d
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Mar 22 23:32:08 2023 +0800
[To rel/0.13][IOTDB-5711] Support connecting multiple nodes in Python API (#9416)
---
client-py/SessionExample.py | 10 +-
client-py/iotdb/Session.py | 651 +++++++++++++++------
client-py/tests/test_dataframe.py | 5 +-
client-py/tests/test_delete_data.py | 9 +-
.../UserGuide/API/Programming-Python-Native-API.md | 21 +-
.../UserGuide/API/Programming-Python-Native-API.md | 21 +-
6 files changed, 523 insertions(+), 194 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index bbc9669527..6dda1c49e0 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -29,7 +29,14 @@ ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session.init_from_node_urls(
+ node_urls=["127.0.0.1:6667", "127.0.0.1:6668"],
+ user="root",
+ password="root",
+ fetch_size=1024,
+ zone_id="UTC+8",
+)
session.open(False)
# set and delete storage groups
@@ -215,6 +222,7 @@ np_tablet_unsorted = NumpyTablet(
np_values_unsorted,
np_timestamps_unsorted,
)
+
session.insert_tablet(np_tablet_unsorted)
print(np_tablet_unsorted.get_timestamps())
for value in np_tablet_unsorted.get_values():
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 145fd94370..cd5a12f67f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -16,14 +16,21 @@
# under the License.
#
import logging
+import random
import struct
import time
-from iotdb.utils.SessionDataSet import SessionDataSet
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TSocket, TTransport
+from iotdb.utils.SessionDataSet import SessionDataSet
+from .thrift.rpc.ttypes import (
+ EndPoint,
+ TSRawDataQueryReq,
+ TSLastDataQueryReq,
+ TSInsertStringRecordsOfOneDeviceReq,
+)
from .thrift.rpc.TSIService import (
Client,
TSCreateTimeseriesReq,
@@ -34,7 +41,6 @@ from .thrift.rpc.TSIService import (
TSExecuteStatementReq,
TSOpenSessionReq,
TSCreateMultiTimeseriesReq,
- TSCreateSchemaTemplateReq,
TSCloseSessionReq,
TSInsertTabletsReq,
TSInsertRecordsReq,
@@ -65,6 +71,7 @@ class Session(object):
DEFAULT_USER = "root"
DEFAULT_PASSWORD = "root"
DEFAULT_ZONE_ID = time.strftime("%z")
+ RETRY_NUM = 3
def __init__(
self,
@@ -77,6 +84,9 @@ class Session(object):
):
self.__host = host
self.__port = port
+ self.__hosts = None
+ self.__ports = None
+ self.__default_endpoint = EndPoint(self.__host, self.__port)
self.__user = user
self.__password = password
self.__fetch_size = fetch_size
@@ -87,21 +97,60 @@ class Session(object):
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
+ self.__enable_rpc_compression = None
+
+ @classmethod
+ def init_from_node_urls(
+ cls,
+ node_urls,
+ user=DEFAULT_USER,
+ password=DEFAULT_PASSWORD,
+ fetch_size=DEFAULT_FETCH_SIZE,
+ zone_id=DEFAULT_ZONE_ID,
+ ):
+ if node_urls is None:
+ raise RuntimeError("node urls is empty")
+ session = Session(None, None, user, password, fetch_size, zone_id)
+ session.__hosts = []
+ session.__ports = []
+ for node_url in node_urls:
+ split = node_url.split(":")
+ session.__hosts.append(split[0])
+ session.__ports.append(split[1])
+ session.__host = session.__hosts[0]
+ session.__port = session.__ports[0]
+ session.__default_endpoint = EndPoint(session.__host, session.__port)
+ return session
def open(self, enable_rpc_compression):
if not self.__is_close:
return
+ self.__enable_rpc_compression = enable_rpc_compression
+ if self.__hosts is None:
+ self.init_connection(self.__default_endpoint)
+ else:
+ for i in range(0, len(self.__hosts)):
+ self.__default_endpoint = EndPoint(self.__hosts[i], self.__ports[i])
+ try:
+ self.init_connection(self.__default_endpoint)
+ except Exception as e:
+ if not self.reconnect():
+ logger.error("Cluster has no nodes to connect")
+ raise e
+ break
+
+ def init_connection(self, endpoint):
self.__transport = TTransport.TFramedTransport(
- TSocket.TSocket(self.__host, self.__port)
+ TSocket.TSocket(endpoint.ip, endpoint.port)
)
if not self.__transport.isOpen():
try:
self.__transport.open()
except TTransport.TTransportException as e:
- logger.exception("TTransportException!", exc_info=e)
+ raise e
- if enable_rpc_compression:
+ if self.__enable_rpc_compression:
self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
else:
self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
@@ -133,6 +182,7 @@ class Session(object):
except Exception as e:
self.__transport.close()
logger.exception("session closed because: ", exc_info=e)
+ raise e
if self.__zone_id is not None:
self.set_time_zone(self.__zone_id)
@@ -165,12 +215,21 @@ class Session(object):
set one storage group
:param group_name: String, storage group name (starts from root)
"""
- status = self.__client.setStorageGroup(self.__session_id, group_name)
- logger.debug(
- "setting storage group {} message: {}".format(group_name, status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.setStorageGroup(self.__session_id, group_name)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.setStorageGroup(self.__session_id, group_name)
+ )
+ except TTransport.TException as e1:
+ logger.exception("create databases fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_storage_group(self, storage_group):
"""
@@ -185,14 +244,23 @@ class Session(object):
delete multiple storage groups.
:param storage_group_lst: List, paths of the target storage groups.
"""
- status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
- logger.debug(
- "delete storage group(s) {} message: {}".format(
- storage_group_lst, status.message
+ try:
+ return Session.verify_success(
+ self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.deleteStorageGroups(
+ self.__session_id, storage_group_lst
+ )
+ )
+ except TTransport.TException as e1:
+ logger.exception("delete database fails because: ", e1)
+ raise e1
+ else:
+ raise e
def create_time_series(
self,
@@ -230,12 +298,20 @@ class Session(object):
attributes,
alias,
)
- status = self.__client.createTimeseries(request)
- logger.debug(
- "creating time series {} message: {}".format(ts_path, status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.createTimeseries(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating time series fails because: ", e1)
+ raise e1
+ else:
+ raise e
def create_aligned_time_series(
self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
@@ -260,14 +336,21 @@ class Session(object):
encoding_lst,
compressor_lst,
)
- status = self.__client.createAlignedTimeseries(request)
- logger.debug(
- "creating aligned time series of device {} message: {}".format(
- measurements_lst, status.message
+ try:
+ return Session.verify_success(
+ self.__client.createAlignedTimeseries(request)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createAlignedTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating time series fails because: ", e1)
+ raise e1
+ raise e
def create_multi_time_series(
self,
@@ -306,28 +389,39 @@ class Session(object):
attributes_lst,
alias_lst,
)
- status = self.__client.createMultiTimeseries(request)
- logger.debug(
- "creating multiple time series {} message: {}".format(
- ts_path_lst, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.createMultiTimeseries(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.createMultiTimeseries(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("creating multi time series fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_time_series(self, paths_list):
"""
delete multiple time series, including data and schema
:param paths_list: List of time series path, which should be complete (starts from root)
"""
- status = self.__client.deleteTimeseries(self.__session_id, paths_list)
- logger.debug(
- "deleting multiple time series {} message: {}".format(
- paths_list, status.message
+ try:
+ return Session.verify_success(
+ self.__client.deleteTimeseries(self.__session_id, paths_list)
)
- )
-
- return Session.verify_success(status)
+ except TTransport.TException:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.deleteTimeseries(self.__session_id, paths_list)
+ )
+ except TTransport.TException as e1:
+ logger.exception("deleting time series fails because: ", e1)
+ raise e1
def check_time_series_exists(self, path):
"""
@@ -346,14 +440,21 @@ class Session(object):
:param paths_list: time series list that the data in.
:param end_time: data with time stamp less than or equal to time will be deleted.
"""
- request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time)
+ request = TSDeleteDataReq(
+ self.__session_id, paths_list, -9223372036854775808, end_time
+ )
try:
- status = self.__client.deleteData(request)
- logger.debug(
- "delete data from {}, message: {}".format(paths_list, status.message)
- )
+ return Session.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
- logger.exception("data deletion fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.deleteData(request))
+ except TTransport.TException as e1:
+ logger.exception("data deletion fails because: ", e1)
+ raise e1
+ else:
+ raise e
def delete_data_in_range(self, paths_list, start_time, end_time):
"""
@@ -364,12 +465,17 @@ class Session(object):
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
try:
- status = self.__client.deleteData(request)
- logger.debug(
- "delete data from {}, message: {}".format(paths_list, status.message)
- )
+ return Session.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
- logger.exception("data deletion fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.deleteData(request))
+ except TTransport.TException as e1:
+ logger.exception("data deletion fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_str_record(self, device_id, timestamp, measurements, string_values):
"""special case for inserting one row of String (TEXT) value"""
@@ -377,18 +483,23 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
- data_types = [TSDataType.TEXT.value for _ in string_values]
request = self.gen_insert_str_record_req(
- device_id, timestamp, measurements, data_types, string_values
- )
- status = self.__client.insertStringRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
+ device_id, timestamp, measurements, string_values
)
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertStringRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_str_record(
self, device_id, timestamp, measurements, string_values
@@ -398,18 +509,23 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
- data_types = [TSDataType.TEXT.value for _ in string_values]
request = self.gen_insert_str_record_req(
- device_id, timestamp, measurements, data_types, string_values, True
+ device_id, timestamp, measurements, string_values, True
)
- status = self.__client.insertStringRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertStringRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertStringRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_record(self, device_id, timestamp, measurements, data_types, values):
"""
@@ -427,14 +543,18 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
- status = self.__client.insertRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -455,14 +575,18 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst
)
- status = self.__client.insertRecords(request)
- logger.debug(
- "insert multiple records to devices {} message: {}".format(
- device_ids, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_record(
self, device_id, timestamp, measurements, data_types, values
@@ -482,14 +606,18 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values, True
)
- status = self.__client.insertRecord(request)
- logger.debug(
- "insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecord(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -510,14 +638,18 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst, True
)
- status = self.__client.insertRecords(request)
- logger.debug(
- "insert multiple records to devices {} message: {}".format(
- device_ids, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_record(
self, device_id, timestamp, measurements, data_types, values
@@ -535,14 +667,19 @@ class Session(object):
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
- status = self.__client.testInsertRecord(request)
- logger.debug(
- "testing! insert one record to device {} message: {}".format(
- device_id, status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.testInsertRecord(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertRecord(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_records(
self, device_ids, times, measurements_lst, types_lst, values_lst
@@ -563,12 +700,19 @@ class Session(object):
request = self.gen_insert_records_req(
device_ids, times, measurements_lst, type_values_lst, values_lst
)
- status = self.__client.testInsertRecords(request)
- logger.debug(
- "testing! insert multiple records, message: {}".format(status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(self.__client.testInsertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertRecords(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_record_req(
self, device_id, timestamp, measurements, data_types, values, is_aligned=False
@@ -588,11 +732,11 @@ class Session(object):
)
def gen_insert_str_record_req(
- self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+ self, device_id, timestamp, measurements, values, is_aligned=False
):
- if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+ if len(values) != len(measurements):
raise RuntimeError(
- "length of data types does not equal to length of values!"
+ "length of measurements does not equal to length of values!"
)
return TSInsertStringRecordReq(
self.__session_id, device_id, measurements, values, timestamp, is_aligned
@@ -649,24 +793,38 @@ class Session(object):
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
- status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
- logger.debug(
- "insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablet_req(tablet)
+ try:
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_tablets(self, tablet_lst):
"""
insert multiple tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
- logger.debug("insert multiple tablets, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablets_req(tablet_lst)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_tablet(self, tablet):
"""
@@ -680,26 +838,38 @@ class Session(object):
The tablet itself is sorted (see docs of Tablet.py)
:param tablet: a tablet specified above
"""
- status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
- logger.debug(
- "insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablet_req(tablet, True)
+ try:
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_tablets(self, tablet_lst):
"""
insert multiple aligned tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- status = self.__client.insertTablets(
- self.gen_insert_tablets_req(tablet_lst, True)
- )
- logger.debug("insert multiple tablets, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ request = self.gen_insert_tablets_req(tablet_lst, True)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
@@ -752,12 +922,22 @@ class Session(object):
request = self.gen_insert_records_of_one_device_request(
device_id, times_list, measurements_list, values_list, types_list
)
-
- # send request
- status = self.__client.insertRecordsOfOneDevice(request)
- logger.debug("insert records of one device, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
@@ -811,10 +991,22 @@ class Session(object):
)
# send request
- status = self.__client.insertRecordsOfOneDevice(request)
- logger.debug("insert records of one device, message: {}".format(status.message))
-
- return Session.verify_success(status)
+ try:
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecordsOfOneDevice(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_records_of_one_device_request(
self,
@@ -852,14 +1044,21 @@ class Session(object):
should be used to test other time cost in client
:param tablet: a tablet of data
"""
- status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
- logger.debug(
- "testing! insert one tablet to device {} message: {}".format(
- tablet.get_device_id(), status.message
- )
- )
-
- return Session.verify_success(status)
+ try:
+ request = self.gen_insert_tablet_req(tablet)
+ return Session.verify_success(self.__client.testInsertTablet(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.testInsertTablet(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_tablets(self, tablet_list):
"""
@@ -867,14 +1066,20 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
- status = self.__client.testInsertTablets(
- self.gen_insert_tablets_req(tablet_list)
- )
- logger.debug(
- "testing! insert multiple tablets, message: {}".format(status.message)
- )
-
- return Session.verify_success(status)
+ try:
+ request = self.gen_insert_tablets_req(tablet_list)
+ return Session.verify_success(self.__client.testInsertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ return Session.verify_success(
+ self.__client.testInsertTablets(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("test insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def gen_insert_tablet_req(self, tablet, is_aligned=False):
data_type_values = [data_type.value for data_type in tablet.get_data_types()]
@@ -926,7 +1131,20 @@ class Session(object):
request = TSExecuteStatementReq(
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
)
- resp = self.__client.executeQueryStatement(request)
+ try:
+ resp = self.__client.executeQueryStatement(request)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeQueryStatement(request)
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ Session.verify_success(resp.status)
return SessionDataSet(
sql,
resp.columns,
@@ -949,12 +1167,22 @@ class Session(object):
try:
resp = self.__client.executeUpdateStatement(request)
status = resp.status
- logger.debug(
- "execute non-query statement {} message: {}".format(sql, status.message)
- )
return Session.verify_success(status)
except TTransport.TException as e:
- raise RuntimeError("execution of non-query statement fails because: ", e)
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeUpdateStatement(request)
+ status = resp.status
+ return Session.verify_success(status)
+ except TTransport.TException as e1:
+ logger.exception(
+ "execution of non-query statement fails because: ", e1
+ )
+ raise e1
+ else:
+ raise e
@staticmethod
def value_to_bytes(data_types, values):
@@ -1075,7 +1303,20 @@ class Session(object):
statementId=self.__statement_id,
enableRedirectQuery=False,
)
- resp = self.__client.executeRawDataQuery(request)
+ try:
+ resp = self.__client.executeRawDataQuery(request)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeRawDataQuery(request)
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ Session.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
@@ -1104,8 +1345,20 @@ class Session(object):
self.__statement_id,
enableRedirectQuery=False,
)
-
- resp = self.__client.executeLastDataQuery(request)
+ try:
+ resp = self.__client.executeLastDataQuery(request)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ request.statementId = self.__statement_id
+ resp = self.__client.executeLastDataQuery(request)
+ except TTransport.TException as e1:
+ logger.exception("execution of query statement fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ Session.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
@@ -1118,3 +1371,29 @@ class Session(object):
resp.queryDataSet,
resp.ignoreTimeStamp,
)
+
+ def reconnect(self):
+ if self.__hosts is None:
+ return False
+ connected = False
+ for i in range(1, self.RETRY_NUM + 1):
+ if self.__transport is not None:
+ self.__transport.close()
+ curr_host_index = random.randint(0, len(self.__hosts))
+ try_host_num = 0
+ for j in range(curr_host_index, len(self.__hosts)):
+ if try_host_num == len(self.__hosts):
+ break
+ self.__default_endpoint = EndPoint(self.__hosts[j], self.__ports[j])
+ if j == len(self.__hosts) - 1:
+ j = -1
+ try_host_num += 1
+ try:
+ self.init_connection(self.__default_endpoint)
+ connected = True
+ except TTransport.TException as e:
+ continue
+ break
+ if connected:
+ break
+ return connected
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index f957c2da42..05aa8eb141 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -67,11 +67,12 @@ def test_non_time_query():
"tags",
"attributes",
"deadband",
- "deadband parameters"
+ "deadband parameters",
]
assert_array_equal(
df.values,
- [[
+ [
+ [
"root.device1.pressure",
None,
"root.device1",
diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py
index 7e9df9ac7e..b0a9f014c2 100644
--- a/client-py/tests/test_delete_data.py
+++ b/client-py/tests/test_delete_data.py
@@ -114,7 +114,9 @@ def test_delete_date():
print_message("insert aligned record of one device failed")
# execute delete data
- session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1)
+ session.delete_data(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1
+ )
# execute raw data query sql statement
session_data_set = session.execute_query_statement(
@@ -140,7 +142,9 @@ def test_delete_date():
assert actual_count == expect_count
# execute delete data
- session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3)
+ session.delete_data_in_range(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3
+ )
# execute raw data query sql statement
session_data_set = session.execute_query_statement(
@@ -175,4 +179,3 @@ else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
-
diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md
index 4c4ee5e9c6..99ac94cfe6 100644
--- a/docs/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/UserGuide/API/Programming-Python-Native-API.md
@@ -59,7 +59,26 @@ session.close()
* Initialize a Session
```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+ ip="127.0.0.1",
+ port="6667",
+ user="root",
+ password="root",
+ fetch_size=1024,
+ zone_id="UTC+8"
+)
+```
+
+* Initialize a Session to connect multiple nodes
+
+```python
+session = Session.init_from_node_urls(
+ node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+ user="root",
+ password="root",
+ fetch_size=1024,
+ zone_id="UTC+8"
+)
```
* Open a session, with a parameter to specify whether to enable RPC compression
diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
index 5038e023b1..7df7994abf 100644
--- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
@@ -60,7 +60,26 @@ session.close()
* 初始化 Session
```python
-session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session = Session(
+ ip="127.0.0.1",
+ port="6667",
+ user="root",
+ password="root",
+ fetch_size=1024,
+ zone_id="UTC+8"
+)
+```
+
+* 初始化可连接多节点的 Session
+
+```python
+session = Session.init_from_node_urls(
+ node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+ user="root",
+ password="root",
+ fetch_size=1024,
+ zone_id="UTC+8"
+)
```
* 开启 Session,并决定是否开启 RPC 压缩