You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/04/10 07:46:09 UTC
[iotdb] branch master updated: [IOTDB-5752] Python Client supports write redirection (#9467)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c951bb6702 [IOTDB-5752] Python Client supports write redirection (#9467)
c951bb6702 is described below
commit c951bb670240e6eaff2fdafafeb061ae91921f4e
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Apr 10 15:46:02 2023 +0800
[IOTDB-5752] Python Client supports write redirection (#9467)
---
client-py/SessionExample.py | 3 +-
client-py/iotdb/Session.py | 580 ++++++++++++++++-----
.../UserGuide/API/Programming-Python-Native-API.md | 6 +-
.../UserGuide/API/Programming-Python-Native-API.md | 6 +-
4 files changed, 458 insertions(+), 137 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index c0ede83b2a..631b3e5d39 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -33,13 +33,14 @@ ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
-# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True)
session = Session.init_from_node_urls(
node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
user="root",
password="root",
fetch_size=1024,
zone_id="UTC+8",
+ enable_redirection=True,
)
session.open(False)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..a1be74eae4 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -25,7 +25,7 @@ from thrift.transport import TSocket, TTransport
from iotdb.utils.SessionDataSet import SessionDataSet
from .template.Template import Template
from .template.TemplateQueryType import TemplateQueryType
-from .thrift.common.ttypes import TEndPoint
+from .thrift.common.ttypes import TEndPoint, TSStatus
from .thrift.rpc.IClientRPCService import (
Client,
TSCreateTimeseriesReq,
@@ -57,17 +57,6 @@ from .thrift.rpc.ttypes import (
TSInsertStringRecordsOfOneDeviceReq,
)
-# for debug
-# from IoTDBConstants import *
-# from SessionDataSet import SessionDataSet
-#
-# from thrift.protocol import TBinaryProtocol, TCompactProtocol
-# from thrift.transport import TSocket, TTransport
-#
-# from iotdb.rpc.IClientRPCService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
-# TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \
-# TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
-# from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
from .utils.IoTDBConstants import TSDataType
logger = logging.getLogger("IoTDB")
@@ -91,6 +80,7 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
self.__host = host
self.__port = port
@@ -101,13 +91,16 @@ class Session(object):
self.__password = password
self.__fetch_size = fetch_size
self.__is_close = True
- self.__transport = None
self.__client = None
+ self.__default_connection = None
self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
self.__enable_rpc_compression = None
+ self.__enable_redirection = enable_redirection
+ self.__device_id_to_endpoint = None
+ self.__endpoint_to_connection = None
@classmethod
def init_from_node_urls(
@@ -117,16 +110,19 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
- session = Session(None, None, user, password, fetch_size, zone_id)
+ session = Session(
+ None, None, user, password, fetch_size, zone_id, enable_redirection
+ )
session.__hosts = []
session.__ports = []
for node_url in node_urls:
split = node_url.split(":")
session.__hosts.append(split[0])
- session.__ports.append(split[1])
+ session.__ports.append(int(split[1]))
session.__host = session.__hosts[0]
session.__port = session.__ports[0]
session.__default_endpoint = TEndPoint(session.__host, session.__port)
@@ -137,33 +133,44 @@ class Session(object):
return
self.__enable_rpc_compression = enable_rpc_compression
if self.__hosts is None:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(self.__default_endpoint)
else:
for i in range(0, len(self.__hosts)):
self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
try:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
except Exception as e:
if not self.reconnect():
logger.error("Cluster has no nodes to connect")
raise e
break
+ self.__client = self.__default_connection.client
+ self.__session_id = self.__default_connection.session_id
+ self.__statement_id = self.__default_connection.statement_id
+ self.__is_close = False
+ if self.__enable_redirection:
+ self.__device_id_to_endpoint = {}
+ self.__endpoint_to_connection = {
+ str(self.__default_endpoint): self.__default_connection
+ }
def init_connection(self, endpoint):
- self.__transport = TTransport.TFramedTransport(
+ transport = TTransport.TFramedTransport(
TSocket.TSocket(endpoint.ip, endpoint.port)
)
- if not self.__transport.isOpen():
+ if not transport.isOpen():
try:
- self.__transport.open()
+ transport.open()
except TTransport.TTransportException as e:
raise e
if self.__enable_rpc_compression:
- self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+ client = Client(TCompactProtocol.TCompactProtocol(transport))
else:
- self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+ client = Client(TBinaryProtocol.TBinaryProtocol(transport))
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
@@ -174,7 +181,8 @@ class Session(object):
)
try:
- open_resp = self.__client.openSession(open_req)
+ open_resp = client.openSession(open_req)
+ Session.verify_success(open_resp.status)
if self.protocol_version != open_resp.serverProtocolVersion:
logger.exception(
@@ -186,20 +194,23 @@ class Session(object):
if open_resp.serverProtocolVersion == 0:
raise TTransport.TException(message="Protocol not supported.")
- self.__session_id = open_resp.sessionId
- self.__statement_id = self.__client.requestStatementId(self.__session_id)
+ session_id = open_resp.sessionId
+ statement_id = client.requestStatementId(session_id)
except Exception as e:
- self.__transport.close()
+ transport.close()
logger.exception("session closed because: ", exc_info=e)
raise e
if self.__zone_id is not None:
- self.set_time_zone(self.__zone_id)
+ request = TSSetTimeZoneReq(session_id, self.__zone_id)
+ try:
+ client.setTimeZone(request)
+ except TTransport.TException as e:
+ raise RuntimeError("Could not set time zone because: ", e)
else:
self.__zone_id = self.get_time_zone()
-
- self.__is_close = False
+ return SessionConnection(client, transport, session_id, statement_id)
def is_open(self):
return not self.__is_close
@@ -207,18 +218,16 @@ class Session(object):
def close(self):
if self.__is_close:
return
- req = TSCloseSessionReq(self.__session_id)
try:
- self.__client.closeSession(req)
- except TTransport.TException as e:
- logger.exception(
- "Error occurs when closing session at server. Maybe server is down. Error message: ",
- exc_info=e,
- )
+ if self.__enable_redirection:
+ for connection in self.__endpoint_to_connection.values():
+ req = TSCloseSessionReq(connection.session_id)
+ connection.close_connection(req)
+ else:
+ req = TSCloseSessionReq(self.__session_id)
+ self.__default_connection.close_connection(req)
finally:
self.__is_close = True
- if self.__transport is not None:
- self.__transport.close()
def set_storage_group(self, group_name):
"""
@@ -497,7 +506,13 @@ class Session(object):
device_id, timestamp, measurements, string_values
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertStringRecord(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -523,7 +538,13 @@ class Session(object):
device_id, timestamp, measurements, string_values, True
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertStringRecord(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -554,7 +575,13 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertRecord(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -567,7 +594,7 @@ class Session(object):
raise e
def insert_records(
- self, device_ids, times, measurements_lst, types_lst, values_lst
+ self, device_ids: list, times, measurements_lst, types_lst, values_lst
):
"""
insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -582,21 +609,57 @@ class Session(object):
for types in types_lst:
data_types = [data_type.value for data_type in types]
type_values_lst.append(data_types)
- request = self.gen_insert_records_req(
- device_ids, times, measurements_lst, type_values_lst, values_lst
- )
- try:
- return Session.verify_success(self.__client.insertRecords(request))
- except TTransport.TException as e:
- if self.reconnect():
+ if self.__enable_redirection:
+ request_group = {}
+ for i in range(len(device_ids)):
+ connection = self.get_connection(device_ids[i])
+ request = request_group.setdefault(
+ connection.client,
+ TSInsertRecordsReq(connection.session_id, [], [], [], []),
+ )
+ request.prefixPaths.append(device_ids[i])
+ request.timestamps.append(times[i])
+ request.measurementsList.append(measurements_lst[i])
+ request.valuesList.append(
+ Session.value_to_bytes(type_values_lst[i], values_lst[i])
+ )
+ for client, request in request_group.items():
try:
- request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
- except TTransport.TException as e1:
- logger.exception("insert fails because: ", e1)
- raise e1
- else:
- raise e
+ Session.verify_success_with_redirection_for_multi_devices(
+ client.insertRecords(request), request.prefixPaths
+ )
+ except RedirectException as e:
+ for device, endpoint in e.device_to_endpoint.items():
+ self.handle_redirection(device, endpoint)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ return 0
+ else:
+ request = self.gen_insert_records_req(
+ device_ids, times, measurements_lst, type_values_lst, values_lst
+ )
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecords(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_record(
self, device_id, timestamp, measurements, data_types, values
@@ -617,7 +680,13 @@ class Session(object):
device_id, timestamp, measurements, data_types, values, True
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertRecord(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -645,21 +714,57 @@ class Session(object):
for types in types_lst:
data_types = [data_type.value for data_type in types]
type_values_lst.append(data_types)
- request = self.gen_insert_records_req(
- device_ids, times, measurements_lst, type_values_lst, values_lst, True
- )
- try:
- return Session.verify_success(self.__client.insertRecords(request))
- except TTransport.TException as e:
- if self.reconnect():
+ if self.__enable_redirection:
+ request_group = {}
+ for i in range(len(device_ids)):
+ connection = self.get_connection(device_ids[i])
+ request = request_group.setdefault(
+ connection.client,
+ TSInsertRecordsReq(connection.session_id, [], [], [], [], True),
+ )
+ request.prefixPaths.append(device_ids[i])
+ request.timestamps.append(times[i])
+ request.measurementsList.append(measurements_lst[i])
+ request.valuesList.append(
+ Session.value_to_bytes(type_values_lst[i], values_lst[i])
+ )
+ for client, request in request_group.items():
try:
- request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
- except TTransport.TException as e1:
- logger.exception("insert fails because: ", e1)
- raise e1
- else:
- raise e
+ Session.verify_success_with_redirection_for_multi_devices(
+ client.insertRecords(request), request.prefixPaths
+ )
+ except RedirectException as e:
+ for device, endpoint in e.device_to_endpoint.items():
+ self.handle_redirection(device, endpoint)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ return 0
+ else:
+ request = self.gen_insert_records_req(
+ device_ids, times, measurements_lst, type_values_lst, values_lst, True
+ )
+ try:
+ return Session.verify_success(self.__client.insertRecords(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertRecords(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def test_insert_record(
self, device_id, timestamp, measurements, data_types, values
@@ -805,7 +910,13 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ connection = self.get_connection(tablet.get_device_id())
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertTablet(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(tablet.get_device_id(), e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -822,19 +933,60 @@ class Session(object):
insert multiple tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- request = self.gen_insert_tablets_req(tablet_lst)
- try:
- return Session.verify_success(self.__client.insertTablets(request))
- except TTransport.TException as e:
- if self.reconnect():
+ if self.__enable_redirection:
+ request_group = {}
+ for i in range(len(tablet_lst)):
+ connection = self.get_connection(tablet_lst[i].get_device_id())
+ request = request_group.setdefault(
+ connection.client,
+ TSInsertTabletsReq(
+ connection.session_id, [], [], [], [], [], [], False
+ ),
+ )
+ request.prefixPaths.append(tablet_lst[i].get_device_id())
+ request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
+ request.measurementsList.append(tablet_lst[i].get_measurements())
+ request.valuesList.append(tablet_lst[i].get_binary_values())
+ request.sizeList.append(tablet_lst[i].get_row_number())
+ data_type_values = [
+ data_type.value for data_type in tablet_lst[i].get_data_types()
+ ]
+ request.typesList.append(data_type_values)
+ for client, request in request_group.items():
try:
- request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
- except TTransport.TException as e1:
- logger.exception("insert fails because: ", e1)
- raise e1
- else:
- raise e
+ Session.verify_success_with_redirection_for_multi_devices(
+ client.insertTablets(request), request.prefixPaths
+ )
+ except RedirectException as e:
+ for device, endpoint in e.device_to_endpoint.items():
+ self.handle_redirection(device, endpoint)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ return 0
+ else:
+ request = self.gen_insert_tablets_req(tablet_lst)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertTablets(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_aligned_tablet(self, tablet):
"""
@@ -850,7 +1002,13 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet, True)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ connection = self.get_connection(tablet.get_device_id())
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertTablet(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(tablet.get_device_id(), e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -867,19 +1025,60 @@ class Session(object):
insert multiple aligned tablets, tablets are independent to each other
:param tablet_lst: List of tablets
"""
- request = self.gen_insert_tablets_req(tablet_lst, True)
- try:
- return Session.verify_success(self.__client.insertTablets(request))
- except TTransport.TException as e:
- if self.reconnect():
+ if self.__enable_redirection:
+ request_group = {}
+ for i in range(len(tablet_lst)):
+ connection = self.get_connection(tablet_lst[i].get_device_id())
+ request = request_group.setdefault(
+ connection.client,
+ TSInsertTabletsReq(
+ connection.session_id, [], [], [], [], [], [], True
+ ),
+ )
+ request.prefixPaths.append(tablet_lst[i].get_device_id())
+ request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
+ request.measurementsList.append(tablet_lst[i].get_measurements())
+ request.valuesList.append(tablet_lst[i].get_binary_values())
+ request.sizeList.append(tablet_lst[i].get_row_number())
+ data_type_values = [
+ data_type.value for data_type in tablet_lst[i].get_data_types()
+ ]
+ request.typesList.append(data_type_values)
+ for client, request in request_group.items():
try:
- request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
- except TTransport.TException as e1:
- logger.exception("insert fails because: ", e1)
- raise e1
- else:
- raise e
+ Session.verify_success_with_redirection_for_multi_devices(
+ client.insertTablets(request), request.prefixPaths
+ )
+ except RedirectException as e:
+ for device, endpoint in e.device_to_endpoint.items():
+ self.handle_redirection(device, endpoint)
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
+ return 0
+ else:
+ request = self.gen_insert_tablets_req(tablet_lst, True)
+ try:
+ return Session.verify_success(self.__client.insertTablets(request))
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return Session.verify_success(
+ self.__client.insertTablets(request)
+ )
+ except TTransport.TException as e1:
+ logger.exception("insert fails because: ", e1)
+ raise e1
+ else:
+ raise e
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
@@ -910,7 +1109,6 @@ class Session(object):
:param measurements_list: measurements list
:param types_list: types list
:param values_list: values list
- :param have_sorted: have these list been sorted by timestamp
"""
# check parameter
size = len(times_list)
@@ -933,9 +1131,13 @@ class Session(object):
device_id, times_list, measurements_list, values_list, types_list
)
try:
- return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertRecordsOfOneDevice(request)
)
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -1002,9 +1204,13 @@ class Session(object):
# send request
try:
- return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertRecordsOfOneDevice(request)
)
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -1054,8 +1260,8 @@ class Session(object):
should be used to test other time cost in client
:param tablet: a tablet of data
"""
+ request = self.gen_insert_tablet_req(tablet)
try:
- request = self.gen_insert_tablet_req(tablet)
return Session.verify_success(self.__client.testInsertTablet(request))
except TTransport.TException as e:
if self.reconnect():
@@ -1076,12 +1282,13 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
+ request = self.gen_insert_tablets_req(tablet_list)
try:
- request = self.gen_insert_tablets_req(tablet_list)
return Session.verify_success(self.__client.testInsertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
+ request.sessionId = self.__session_id
return Session.verify_success(
self.__client.testInsertTablets(request)
)
@@ -1136,7 +1343,8 @@ class Session(object):
"""
execute query sql statement and returns SessionDataSet
:param sql: String, query sql statement
- :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
+ :param timeout:
+ :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py):
"""
request = TSExecuteStatementReq(
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
@@ -1302,7 +1510,7 @@ class Session(object):
return True
@staticmethod
- def verify_success(status):
+ def verify_success(status: TSStatus):
"""
verify success of operation
:param status: execution result status
@@ -1320,7 +1528,7 @@ class Session(object):
raise RuntimeError(str(status.code) + ": " + status.message)
@staticmethod
- def verify_success_by_list(status_list):
+ def verify_success_by_list(status_list: list):
"""
verify success of operation
:param status_list: execution result status
@@ -1334,6 +1542,28 @@ class Session(object):
message += status.message + "; "
raise RuntimeError(message)
+ @staticmethod
+ def verify_success_with_redirection(status: TSStatus):
+ Session.verify_success(status)
+ if status.redirectNode is not None:
+ raise RedirectException(status.redirectNode)
+ return 0
+
+ @staticmethod
+ def verify_success_with_redirection_for_multi_devices(
+ status: TSStatus, devices: list
+ ):
+ Session.verify_success(status)
+ if (
+ status.code == Session.MULTIPLE_ERROR
+ or status.code == Session.REDIRECTION_RECOMMEND
+ ):
+ device_to_endpoint = {}
+ for i in range(len(status.subStatus)):
+ if status.subStatus[i].redirectNode is not None:
+ device_to_endpoint[devices[i]] = status.subStatus[i].redirectNode
+ raise RedirectException(device_to_endpoint)
+
def execute_raw_data_query(
self, paths: list, start_time: int, end_time: int
) -> SessionDataSet:
@@ -1448,9 +1678,13 @@ class Session(object):
device_id, times, measurements_list, values_list, have_sorted, False
)
try:
- return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertStringRecordsOfOneDevice(request)
)
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -1480,9 +1714,13 @@ class Session(object):
device_id, times, measurements_list, values, have_sorted, True
)
try:
- return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ connection = self.get_connection(device_id)
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertStringRecordsOfOneDevice(request)
)
+ except RedirectException as e:
+ return self.handle_redirection(device_id, e.redirect_node)
except TTransport.TException as e:
if self.reconnect():
try:
@@ -1501,29 +1739,71 @@ class Session(object):
return False
connected = False
for i in range(1, self.RETRY_NUM + 1):
- if self.__transport is not None:
- self.__transport.close()
- curr_host_index = random.randint(0, len(self.__hosts))
- try_host_num = 0
- for j in range(curr_host_index, len(self.__hosts)):
- if try_host_num == len(self.__hosts):
- break
- self.__default_endpoint = TEndPoint(
- self.__hosts[j], self.__ports[j]
- )
- if j == len(self.__hosts) - 1:
- j = -1
- try_host_num += 1
- try:
- self.init_connection(self.__default_endpoint)
- connected = True
- except TTransport.TException as e:
- continue
+ if (
+ self.__default_connection is not None
+ and self.__default_connection.transport is not None
+ ):
+ self.__default_connection.transport.close()
+ curr_host_index = random.randint(0, len(self.__hosts))
+ try_host_num = 0
+ for j in range(curr_host_index, len(self.__hosts)):
+ if try_host_num == len(self.__hosts):
break
+ self.__default_endpoint = TEndPoint(self.__hosts[j], self.__ports[j])
+ if j == len(self.__hosts) - 1:
+ j = -1
+ try_host_num += 1
+ try:
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
+ self.__client = self.__default_connection.client
+ self.__session_id = self.__default_connection.session_id
+ self.__statement_id = self.__default_connection.statement_id
+ connected = True
+ if self.__enable_redirection:
+ self.__endpoint_to_connection = {
+ str(self.__default_endpoint): self.__default_connection
+ }
+ except TTransport.TException:
+ continue
+ break
if connected:
break
return connected
+ def get_connection(self, device_id):
+ if (
+ self.__enable_redirection
+ and len(self.__device_id_to_endpoint) != 0
+ and device_id in self.__device_id_to_endpoint
+ ):
+ endpoint = self.__device_id_to_endpoint[device_id]
+ if str(endpoint) in self.__endpoint_to_connection:
+ return self.__endpoint_to_connection[str(endpoint)]
+ return self.__default_connection
+
+ def handle_redirection(self, device_id, endpoint: TEndPoint):
+ if self.__enable_redirection:
+ if endpoint.ip == "0.0.0.0":
+ return 0
+ if (
+ device_id not in self.__device_id_to_endpoint
+ or self.__device_id_to_endpoint[device_id] != endpoint
+ ):
+ self.__device_id_to_endpoint[device_id] = endpoint
+ if str(endpoint) in self.__endpoint_to_connection:
+ connection = self.__endpoint_to_connection[str(endpoint)]
+ else:
+ try:
+ connection = self.init_connection(endpoint)
+ except Exception:
+ connection = None
+ self.__endpoint_to_connection[str(endpoint)] = connection
+ if connection is None:
+ self.__device_id_to_endpoint.pop(device_id)
+ return 0
+
def gen_insert_string_records_of_one_device_request(
self,
device_id,
@@ -1607,9 +1887,10 @@ class Session(object):
is_aligned: bool = False,
):
"""
- add measurements in the template, the template must already create. This function adds some measurements node.
+ add measurements in the template, the template must already create. This function adds some measurements' node.
:param template_name: template name, string list, like ["name_x", "name_y", "name_z"]
- :param measurements_path: when ths is_aligned is True, recommend the name like a.b, like [python.x, python.y, iotdb.z]
+ :param measurements_path: when ths is_aligned is True, recommend the name like a.b,
+ like [python.x, python.y, iotdb.z]
:param data_types: using TSDataType(see IoTDBConstants.py)
:param encodings: using TSEncoding(see IoTDBConstants.py)
:param compressors: using Compressor(see IoTDBConstants.py)
@@ -1800,7 +2081,7 @@ class Session(object):
"""
show all measurements under the pattern in template
:param template_name: template name
- :param pattern: parent path, if default, show all measurements
+ :param pattern: parent path, if defaulted, show all measurements
"""
request = TSQueryTemplateReq(
self.__session_id,
@@ -1904,3 +2185,38 @@ class Session(object):
raise e1
else:
raise e
+
+
+class SessionConnection(object):
+ def __init__(
+ self,
+ client,
+ transport,
+ session_id,
+ statement_id,
+ ):
+ self.client = client
+ self.transport = transport
+ self.session_id = session_id
+ self.statement_id = statement_id
+
+ def close_connection(self, req):
+ try:
+ self.client.closeSession(req)
+ except TTransport.TException as e:
+ logger.exception(
+ "Error occurs when closing session at server. Maybe server is down. Error message: ",
+ exc_info=e,
+ )
+ finally:
+ if self.transport is not None:
+ self.transport.close()
+
+
+class RedirectException(Exception):
+ def __init__(self, redirect_info):
+ Exception.__init__(self)
+ if isinstance(redirect_info, TEndPoint):
+ self.redirect_node = redirect_info
+ else:
+ self.device_to_endpoint = redirect_info
diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md
index 1233a071dd..44fdd90cc3 100644
--- a/docs/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/UserGuide/API/Programming-Python-Native-API.md
@@ -63,7 +63,8 @@ session = Session(
user="root",
password="root",
fetch_size=1024,
- zone_id="UTC+8"
+ zone_id="UTC+8",
+ enable_redirection=True
)
```
@@ -75,7 +76,8 @@ session = Session.init_from_node_urls(
user="root",
password="root",
fetch_size=1024,
- zone_id="UTC+8"
+ zone_id="UTC+8",
+ enable_redirection=True
)
```
diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
index f5025f5d89..8e09dbdac8 100644
--- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md
@@ -64,7 +64,8 @@ session = Session(
user="root",
password="root",
fetch_size=1024,
- zone_id="UTC+8"
+ zone_id="UTC+8",
+ enable_redirection=True
)
```
@@ -76,7 +77,8 @@ session = Session.init_from_node_urls(
user="root",
password="root",
fetch_size=1024,
- zone_id="UTC+8"
+ zone_id="UTC+8",
+ enable_redirection=True
)
```