You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/27 10:56:42 UTC
[iotdb] 01/01: [Draft] python cache leader
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d21dccfb531e30069132249fb41d0c6dcf507942
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Mar 27 18:52:38 2023 +0800
[Draft] python cache leader
---
client-py/iotdb/Session.py | 383 ++++++++++++++++++++++++++++++---------------
1 file changed, 261 insertions(+), 122 deletions(-)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..ac2e7a6f7f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -91,6 +91,7 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
self.__host = host
self.__port = port
@@ -101,13 +102,16 @@ class Session(object):
self.__password = password
self.__fetch_size = fetch_size
self.__is_close = True
- self.__transport = None
- self.__client = None
+ self.__default_client = None
+ self.__default_connection = None
self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
self.__enable_rpc_compression = None
+ self.__enable_redirection = enable_redirection
+ self.__device_id_to_endpoint = None
+ self.__endpoint_to_connection = None
@classmethod
def init_from_node_urls(
@@ -117,10 +121,13 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
- session = Session(None, None, user, password, fetch_size, zone_id)
+ session = Session(
+ None, None, user, password, fetch_size, zone_id, enable_redirection
+ )
session.__hosts = []
session.__ports = []
for node_url in node_urls:
@@ -137,34 +144,45 @@ class Session(object):
return
self.__enable_rpc_compression = enable_rpc_compression
if self.__hosts is None:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(self.__default_endpoint)
+ self.__default_client = self.__default_connection.client
else:
for i in range(0, len(self.__hosts)):
self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
try:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
+ self.__default_client = self.__default_connection.client
except Exception as e:
if not self.reconnect():
logger.error("Cluster has no nodes to connect")
raise e
break
+ self.__is_close = False
+ if self.__enable_redirection:
+ self.__device_id_to_endpoint = {}
+ self.__endpoint_to_connection = {
+ self.__default_endpoint: self.__default_connection
+ }
def init_connection(self, endpoint):
- self.__transport = TTransport.TFramedTransport(
+ transport = TTransport.TFramedTransport(
TSocket.TSocket(endpoint.ip, endpoint.port)
)
- if not self.__transport.isOpen():
+ if not transport.isOpen():
try:
- self.__transport.open()
+ transport.open()
except TTransport.TTransportException as e:
raise e
if self.__enable_rpc_compression:
- self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+ client = Client(TCompactProtocol.TCompactProtocol(transport))
else:
- self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+ client = Client(TBinaryProtocol.TBinaryProtocol(transport))
+ connection = SessionConnection(client, transport)
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
username=self.__user,
@@ -174,7 +192,8 @@ class Session(object):
)
try:
- open_resp = self.__client.openSession(open_req)
+ open_resp = client.openSession(open_req)
+ Session.verify_success(open_resp.status)
if self.protocol_version != open_resp.serverProtocolVersion:
logger.exception(
@@ -187,19 +206,23 @@ class Session(object):
raise TTransport.TException(message="Protocol not supported.")
self.__session_id = open_resp.sessionId
- self.__statement_id = self.__client.requestStatementId(self.__session_id)
+ self.__statement_id = client.requestStatementId(self.__session_id)
except Exception as e:
- self.__transport.close()
+ transport.close()
logger.exception("session closed because: ", exc_info=e)
raise e
if self.__zone_id is not None:
- self.set_time_zone(self.__zone_id)
+ request = TSSetTimeZoneReq(self.__session_id, self.__zone_id)
+ try:
+ client.setTimeZone(request)
+ except TTransport.TException as e:
+ raise RuntimeError("Could not set time zone because: ", e)
else:
self.__zone_id = self.get_time_zone()
- self.__is_close = False
+ return connection
def is_open(self):
return not self.__is_close
@@ -209,16 +232,13 @@ class Session(object):
return
req = TSCloseSessionReq(self.__session_id)
try:
- self.__client.closeSession(req)
- except TTransport.TException as e:
- logger.exception(
- "Error occurs when closing session at server. Maybe server is down. Error message: ",
- exc_info=e,
- )
+ if self.__enable_redirection:
+ for connection in self.__endpoint_to_connection.values():
+ connection.close_connection(req)
+ else:
+ self.__default_connection.close_connection(req)
finally:
self.__is_close = True
- if self.__transport is not None:
- self.__transport.close()
def set_storage_group(self, group_name):
"""
@@ -227,13 +247,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.setStorageGroup(self.__session_id, group_name)
+ self.__default_client.setStorageGroup(self.__session_id, group_name)
)
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.setStorageGroup(self.__session_id, group_name)
+ self.__default_client.setStorageGroup(
+ self.__session_id, group_name
+ )
)
except TTransport.TException as e1:
logger.exception("create databases fails because: ", e1)
@@ -256,13 +278,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+ self.__default_client.deleteStorageGroups(
+ self.__session_id, storage_group_lst
+ )
)
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.deleteStorageGroups(
+ self.__default_client.deleteStorageGroups(
self.__session_id, storage_group_lst
)
)
@@ -309,13 +333,15 @@ class Session(object):
alias,
)
try:
- return Session.verify_success(self.__client.createTimeseries(request))
+ return Session.verify_success(
+ self.__default_client.createTimeseries(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createTimeseries(request)
+ self.__default_client.createTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating time series fails because: ", e1)
@@ -348,14 +374,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.createAlignedTimeseries(request)
+ self.__default_client.createAlignedTimeseries(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createAlignedTimeseries(request)
+ self.__default_client.createAlignedTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating time series fails because: ", e1)
@@ -400,13 +426,15 @@ class Session(object):
alias_lst,
)
try:
- return Session.verify_success(self.__client.createMultiTimeseries(request))
+ return Session.verify_success(
+ self.__default_client.createMultiTimeseries(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createMultiTimeseries(request)
+ self.__default_client.createMultiTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating multi time series fails because: ", e1)
@@ -421,13 +449,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.deleteTimeseries(self.__session_id, paths_list)
+ self.__default_client.deleteTimeseries(self.__session_id, paths_list)
)
except TTransport.TException:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.deleteTimeseries(self.__session_id, paths_list)
+ self.__default_client.deleteTimeseries(
+ self.__session_id, paths_list
+ )
)
except TTransport.TException as e1:
logger.exception("deleting time series fails because: ", e1)
@@ -454,12 +484,14 @@ class Session(object):
self.__session_id, paths_list, -9223372036854775808, end_time
)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(self.__default_client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(
+ self.__default_client.deleteData(request)
+ )
except TTransport.TException as e1:
logger.exception("data deletion fails because: ", e1)
raise e1
@@ -475,12 +507,14 @@ class Session(object):
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(self.__default_client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(
+ self.__default_client.deleteData(request)
+ )
except TTransport.TException as e1:
logger.exception("data deletion fails because: ", e1)
raise e1
@@ -497,13 +531,15 @@ class Session(object):
device_id, timestamp, measurements, string_values
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertStringRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecord(request)
+ self.__default_client.insertStringRecord(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -523,13 +559,15 @@ class Session(object):
device_id, timestamp, measurements, string_values, True
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertStringRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecord(request)
+ self.__default_client.insertStringRecord(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -554,12 +592,16 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.__default_client.insertRecord(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -586,12 +628,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst
)
try:
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(self.__default_client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(
+ self.__default_client.insertRecords(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -617,12 +661,16 @@ class Session(object):
device_id, timestamp, measurements, data_types, values, True
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.__default_client.insertRecord(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -649,12 +697,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst, True
)
try:
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(self.__default_client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(
+ self.__default_client.insertRecords(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -678,12 +728,14 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return Session.verify_success(self.__client.testInsertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).testInsertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.testInsertRecord(request)
+ self.__default_client.testInsertRecord(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -711,12 +763,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst
)
try:
- return Session.verify_success(self.__client.testInsertRecords(request))
+ return Session.verify_success(
+ self.__default_client.testInsertRecords(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.testInsertRecords(request)
+ self.__default_client.testInsertRecords(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -805,12 +859,16 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).insertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.__default_client.insertTablet(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -824,12 +882,14 @@ class Session(object):
"""
request = self.gen_insert_tablets_req(tablet_lst)
try:
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(self.__default_client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(
+ self.__default_client.insertTablets(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -850,12 +910,16 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet, True)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).insertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.__default_client.insertTablet(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -869,12 +933,14 @@ class Session(object):
"""
request = self.gen_insert_tablets_req(tablet_lst, True)
try:
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(self.__default_client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(
+ self.__default_client.insertTablets(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -934,14 +1000,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.get_client(device_id).insertRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.__default_client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1003,14 +1069,14 @@ class Session(object):
# send request
try:
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.get_client(device_id).insertRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.__default_client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1054,15 +1120,17 @@ class Session(object):
should be used to test other time cost in client
:param tablet: a tablet of data
"""
+ request = self.gen_insert_tablet_req(tablet)
try:
- request = self.gen_insert_tablet_req(tablet)
- return Session.verify_success(self.__client.testInsertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).testInsertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.testInsertTablet(request)
+ self.__default_client.testInsertTablet(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -1076,14 +1144,17 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
+ request = self.gen_insert_tablets_req(tablet_list)
try:
- request = self.gen_insert_tablets_req(tablet_list)
- return Session.verify_success(self.__client.testInsertTablets(request))
+ return Session.verify_success(
+ self.__default_client.testInsertTablets(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
+ request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.testInsertTablets(request)
+ self.__default_client.testInsertTablets(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -1142,13 +1213,13 @@ class Session(object):
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
)
try:
- resp = self.__client.executeQueryStatement(request)
+ resp = self.__default_client.executeQueryStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeQueryStatement(request)
+ resp = self.__default_client.executeQueryStatement(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1161,7 +1232,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1175,13 +1246,13 @@ class Session(object):
"""
request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
try:
- resp = self.__client.executeUpdateStatement(request)
+ resp = self.__default_client.executeUpdateStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeUpdateStatement(request)
+ resp = self.__default_client.executeUpdateStatement(request)
except TTransport.TException as e1:
logger.exception(
"execution of non-query statement fails because: ", e1
@@ -1196,13 +1267,13 @@ class Session(object):
self.__session_id, sql, self.__statement_id, timeout
)
try:
- resp = self.__client.executeStatement(request)
+ resp = self.__default_client.executeStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeStatement(request)
+ resp = self.__default_client.executeStatement(request)
except TTransport.TException as e1:
logger.exception("execution of statement fails because: ", e1)
raise e1
@@ -1216,7 +1287,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1276,7 +1347,7 @@ class Session(object):
if self.__zone_id is not None:
return self.__zone_id
try:
- resp = self.__client.getTimeZone(self.__session_id)
+ resp = self.__default_client.getTimeZone(self.__session_id)
except TTransport.TException as e:
raise RuntimeError("Could not get time zone because: ", e)
return resp.timeZone
@@ -1284,7 +1355,7 @@ class Session(object):
def set_time_zone(self, zone_id):
request = TSSetTimeZoneReq(self.__session_id, zone_id)
try:
- status = self.__client.setTimeZone(request)
+ status = self.__default_client.setTimeZone(request)
logger.debug(
"setting time zone_id as {}, message: {}".format(
zone_id, status.message
@@ -1354,13 +1425,13 @@ class Session(object):
enableRedirectQuery=False,
)
try:
- resp = self.__client.executeRawDataQuery(request)
+ resp = self.__default_client.executeRawDataQuery(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeRawDataQuery(request)
+ resp = self.__default_client.executeRawDataQuery(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1373,7 +1444,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1396,13 +1467,13 @@ class Session(object):
enableRedirectQuery=False,
)
try:
- resp = self.__client.executeLastDataQuery(request)
+ resp = self.__default_client.executeLastDataQuery(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeLastDataQuery(request)
+ resp = self.__default_client.executeLastDataQuery(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1415,7 +1486,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1449,14 +1520,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.get_client(device_id).insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.__default_client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1481,14 +1552,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.get_client(device_id).insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.__default_client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1501,8 +1572,8 @@ class Session(object):
return False
connected = False
for i in range(1, self.RETRY_NUM + 1):
- if self.__transport is not None:
- self.__transport.close()
+ if self.__default_connection.transport is not None:
+ self.__default_connection.transport.close()
curr_host_index = random.randint(0, len(self.__hosts))
try_host_num = 0
for j in range(curr_host_index, len(self.__hosts)):
@@ -1515,7 +1586,10 @@ class Session(object):
j = -1
try_host_num += 1
try:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
+ self.__default_client = self.__default_connection.client
connected = True
except TTransport.TException as e:
continue
@@ -1524,6 +1598,37 @@ class Session(object):
break
return connected
+ def get_client(self, device_id):
+ if (
+ self.__enable_redirection
+ and len(self.__device_id_to_endpoint) != 0
+ and self.__device_id_to_endpoint.has_key(device_id)
+ ):
+ endpoint = self.__device_id_to_endpoint.get(device_id)
+ if self.__endpoint_to_connection.has_key(endpoint):
+ return self.__endpoint_to_connection.get(endpoint).client
+ return self.__default_client
+
+ def handle_redirection(self, device_id, endpoint):
+ if self.__enable_redirection:
+ if endpoint.ip is "0.0.0.0":
+ return
+ if (
+ not self.__device_id_to_endpoint.has_key(device_id)
+ or self.__device_id_to_endpoint.get(device_id) != endpoint
+ ):
+ self.__device_id_to_endpoint[device_id] = endpoint
+ if self.__endpoint_to_connection.has_key(endpoint):
+ connection = self.__endpoint_to_connection[endpoint]
+ else:
+ try:
+ connection = self.init_connection(endpoint)
+ except Exception:
+ connection = None
+ self.__endpoint_to_connection[endpoint] = connection
+ if connection is None:
+ self.__device_id_to_endpoint.pop(device_id)
+
def gen_insert_string_records_of_one_device_request(
self,
device_id,
@@ -1562,13 +1667,15 @@ class Session(object):
self.__session_id, template.get_name(), bytes_array
)
try:
- return Session.verify_success(self.__client.createSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.createSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createSchemaTemplate(request)
+ self.__default_client.createSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("create template fails because: ", e1)
@@ -1583,13 +1690,15 @@ class Session(object):
"""
request = TSDropSchemaTemplateReq(self.__session_id, template_name)
try:
- return Session.verify_success(self.__client.dropSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.dropSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.dropSchemaTemplate(request)
+ self.__default_client.dropSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("drop template fails because: ", e1)
@@ -1625,13 +1734,15 @@ class Session(object):
list(map(lambda x: x.value, compressors)),
)
try:
- return Session.verify_success(self.__client.appendSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.appendSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.appendSchemaTemplate(request)
+ self.__default_client.appendSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("append template fails because: ", e1)
@@ -1647,13 +1758,15 @@ class Session(object):
"""
request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path)
try:
- return Session.verify_success(self.__client.pruneSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.pruneSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.pruneSchemaTemplate(request)
+ self.__default_client.pruneSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("prune template fails because: ", e1)
@@ -1669,13 +1782,15 @@ class Session(object):
"""
request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path)
try:
- return Session.verify_success(self.__client.setSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.setSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.setSchemaTemplate(request)
+ self.__default_client.setSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("set template fails because: ", e1)
@@ -1694,13 +1809,15 @@ class Session(object):
self.__session_id, prefix_path, template_name
)
try:
- return Session.verify_success(self.__client.unsetSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.unsetSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.unsetSchemaTemplate(request)
+ self.__default_client.unsetSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("unset template fails because: ", e1)
@@ -1719,13 +1836,13 @@ class Session(object):
TemplateQueryType.COUNT_MEASUREMENTS.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
return response.count
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.count
except TTransport.TException as e1:
@@ -1749,14 +1866,14 @@ class Session(object):
path,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e1:
@@ -1778,14 +1895,14 @@ class Session(object):
self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e1:
@@ -1809,14 +1926,14 @@ class Session(object):
pattern,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1837,14 +1954,14 @@ class Session(object):
TemplateQueryType.SHOW_TEMPLATES.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1862,14 +1979,14 @@ class Session(object):
self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1889,14 +2006,14 @@ class Session(object):
TemplateQueryType.SHOW_USING_TEMPLATES.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1904,3 +2021,25 @@ class Session(object):
raise e1
else:
raise e
+
+
+class SessionConnection(object):
+ def __init__(
+ self,
+ client,
+ transport,
+ ):
+ self.client = client
+ self.transport = transport
+
+ def close_connection(self, req):
+ try:
+ self.client.closeSession(req)
+ except TTransport.TException as e:
+ logger.exception(
+ "Error occurs when closing session at server. Maybe server is down. Error message: ",
+ exc_info=e,
+ )
+ finally:
+ if self.transport is not None:
+ self.transport.close()