You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/27 10:56:41 UTC
[iotdb] branch python_cache_leader created (now d21dccfb53)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a change to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at d21dccfb53 [Draft] python cache leader
This branch includes the following new commits:
new d21dccfb53 [Draft] python cache leader
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: [Draft] python cache leader
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch python_cache_leader
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d21dccfb531e30069132249fb41d0c6dcf507942
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Mar 27 18:52:38 2023 +0800
[Draft] python cache leader
---
client-py/iotdb/Session.py | 383 ++++++++++++++++++++++++++++++---------------
1 file changed, 261 insertions(+), 122 deletions(-)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 5a7cf47736..ac2e7a6f7f 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -91,6 +91,7 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
self.__host = host
self.__port = port
@@ -101,13 +102,16 @@ class Session(object):
self.__password = password
self.__fetch_size = fetch_size
self.__is_close = True
- self.__transport = None
- self.__client = None
+ self.__default_client = None
+ self.__default_connection = None
self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
self.__enable_rpc_compression = None
+ self.__enable_redirection = enable_redirection
+ self.__device_id_to_endpoint = None
+ self.__endpoint_to_connection = None
@classmethod
def init_from_node_urls(
@@ -117,10 +121,13 @@ class Session(object):
password=DEFAULT_PASSWORD,
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
+ enable_redirection=True,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
- session = Session(None, None, user, password, fetch_size, zone_id)
+ session = Session(
+ None, None, user, password, fetch_size, zone_id, enable_redirection
+ )
session.__hosts = []
session.__ports = []
for node_url in node_urls:
@@ -137,34 +144,45 @@ class Session(object):
return
self.__enable_rpc_compression = enable_rpc_compression
if self.__hosts is None:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(self.__default_endpoint)
+ self.__default_client = self.__default_connection.client
else:
for i in range(0, len(self.__hosts)):
self.__default_endpoint = TEndPoint(self.__hosts[i], self.__ports[i])
try:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
+ self.__default_client = self.__default_connection.client
except Exception as e:
if not self.reconnect():
logger.error("Cluster has no nodes to connect")
raise e
break
+ self.__is_close = False
+ if self.__enable_redirection:
+ self.__device_id_to_endpoint = {}
+ self.__endpoint_to_connection = {
+ self.__default_endpoint: self.__default_connection
+ }
def init_connection(self, endpoint):
- self.__transport = TTransport.TFramedTransport(
+ transport = TTransport.TFramedTransport(
TSocket.TSocket(endpoint.ip, endpoint.port)
)
- if not self.__transport.isOpen():
+ if not transport.isOpen():
try:
- self.__transport.open()
+ transport.open()
except TTransport.TTransportException as e:
raise e
if self.__enable_rpc_compression:
- self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+ client = Client(TCompactProtocol.TCompactProtocol(transport))
else:
- self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+ client = Client(TBinaryProtocol.TBinaryProtocol(transport))
+ connection = SessionConnection(client, transport)
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
username=self.__user,
@@ -174,7 +192,8 @@ class Session(object):
)
try:
- open_resp = self.__client.openSession(open_req)
+ open_resp = client.openSession(open_req)
+ Session.verify_success(open_resp.status)
if self.protocol_version != open_resp.serverProtocolVersion:
logger.exception(
@@ -187,19 +206,23 @@ class Session(object):
raise TTransport.TException(message="Protocol not supported.")
self.__session_id = open_resp.sessionId
- self.__statement_id = self.__client.requestStatementId(self.__session_id)
+ self.__statement_id = client.requestStatementId(self.__session_id)
except Exception as e:
- self.__transport.close()
+ transport.close()
logger.exception("session closed because: ", exc_info=e)
raise e
if self.__zone_id is not None:
- self.set_time_zone(self.__zone_id)
+ request = TSSetTimeZoneReq(self.__session_id, self.__zone_id)
+ try:
+ client.setTimeZone(request)
+ except TTransport.TException as e:
+ raise RuntimeError("Could not set time zone because: ", e)
else:
self.__zone_id = self.get_time_zone()
- self.__is_close = False
+ return connection
def is_open(self):
return not self.__is_close
@@ -209,16 +232,13 @@ class Session(object):
return
req = TSCloseSessionReq(self.__session_id)
try:
- self.__client.closeSession(req)
- except TTransport.TException as e:
- logger.exception(
- "Error occurs when closing session at server. Maybe server is down. Error message: ",
- exc_info=e,
- )
+ if self.__enable_redirection:
+ for connection in self.__endpoint_to_connection.values():
+ connection.close_connection(req)
+ else:
+ self.__default_connection.close_connection(req)
finally:
self.__is_close = True
- if self.__transport is not None:
- self.__transport.close()
def set_storage_group(self, group_name):
"""
@@ -227,13 +247,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.setStorageGroup(self.__session_id, group_name)
+ self.__default_client.setStorageGroup(self.__session_id, group_name)
)
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.setStorageGroup(self.__session_id, group_name)
+ self.__default_client.setStorageGroup(
+ self.__session_id, group_name
+ )
)
except TTransport.TException as e1:
logger.exception("create databases fails because: ", e1)
@@ -256,13 +278,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+ self.__default_client.deleteStorageGroups(
+ self.__session_id, storage_group_lst
+ )
)
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.deleteStorageGroups(
+ self.__default_client.deleteStorageGroups(
self.__session_id, storage_group_lst
)
)
@@ -309,13 +333,15 @@ class Session(object):
alias,
)
try:
- return Session.verify_success(self.__client.createTimeseries(request))
+ return Session.verify_success(
+ self.__default_client.createTimeseries(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createTimeseries(request)
+ self.__default_client.createTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating time series fails because: ", e1)
@@ -348,14 +374,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.createAlignedTimeseries(request)
+ self.__default_client.createAlignedTimeseries(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createAlignedTimeseries(request)
+ self.__default_client.createAlignedTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating time series fails because: ", e1)
@@ -400,13 +426,15 @@ class Session(object):
alias_lst,
)
try:
- return Session.verify_success(self.__client.createMultiTimeseries(request))
+ return Session.verify_success(
+ self.__default_client.createMultiTimeseries(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createMultiTimeseries(request)
+ self.__default_client.createMultiTimeseries(request)
)
except TTransport.TException as e1:
logger.exception("creating multi time series fails because: ", e1)
@@ -421,13 +449,15 @@ class Session(object):
"""
try:
return Session.verify_success(
- self.__client.deleteTimeseries(self.__session_id, paths_list)
+ self.__default_client.deleteTimeseries(self.__session_id, paths_list)
)
except TTransport.TException:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.deleteTimeseries(self.__session_id, paths_list)
+ self.__default_client.deleteTimeseries(
+ self.__session_id, paths_list
+ )
)
except TTransport.TException as e1:
logger.exception("deleting time series fails because: ", e1)
@@ -454,12 +484,14 @@ class Session(object):
self.__session_id, paths_list, -9223372036854775808, end_time
)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(self.__default_client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(
+ self.__default_client.deleteData(request)
+ )
except TTransport.TException as e1:
logger.exception("data deletion fails because: ", e1)
raise e1
@@ -475,12 +507,14 @@ class Session(object):
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(self.__default_client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.deleteData(request))
+ return Session.verify_success(
+ self.__default_client.deleteData(request)
+ )
except TTransport.TException as e1:
logger.exception("data deletion fails because: ", e1)
raise e1
@@ -497,13 +531,15 @@ class Session(object):
device_id, timestamp, measurements, string_values
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertStringRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecord(request)
+ self.__default_client.insertStringRecord(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -523,13 +559,15 @@ class Session(object):
device_id, timestamp, measurements, string_values, True
)
try:
- return Session.verify_success(self.__client.insertStringRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertStringRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecord(request)
+ self.__default_client.insertStringRecord(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -554,12 +592,16 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.__default_client.insertRecord(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -586,12 +628,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst
)
try:
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(self.__default_client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(
+ self.__default_client.insertRecords(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -617,12 +661,16 @@ class Session(object):
device_id, timestamp, measurements, data_types, values, True
)
try:
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).insertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecord(request))
+ return Session.verify_success(
+ self.__default_client.insertRecord(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -649,12 +697,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst, True
)
try:
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(self.__default_client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertRecords(request))
+ return Session.verify_success(
+ self.__default_client.insertRecords(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -678,12 +728,14 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return Session.verify_success(self.__client.testInsertRecord(request))
+ return Session.verify_success(
+ self.get_client(device_id).testInsertRecord(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.testInsertRecord(request)
+ self.__default_client.testInsertRecord(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -711,12 +763,14 @@ class Session(object):
device_ids, times, measurements_lst, type_values_lst, values_lst
)
try:
- return Session.verify_success(self.__client.testInsertRecords(request))
+ return Session.verify_success(
+ self.__default_client.testInsertRecords(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
return Session.verify_success(
- self.__client.testInsertRecords(request)
+ self.__default_client.testInsertRecords(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -805,12 +859,16 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).insertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.__default_client.insertTablet(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -824,12 +882,14 @@ class Session(object):
"""
request = self.gen_insert_tablets_req(tablet_lst)
try:
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(self.__default_client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(
+ self.__default_client.insertTablets(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -850,12 +910,16 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet, True)
try:
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).insertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablet(request))
+ return Session.verify_success(
+ self.__default_client.insertTablet(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -869,12 +933,14 @@ class Session(object):
"""
request = self.gen_insert_tablets_req(tablet_lst, True)
try:
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(self.__default_client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(self.__client.insertTablets(request))
+ return Session.verify_success(
+ self.__default_client.insertTablets(request)
+ )
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
raise e1
@@ -934,14 +1000,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.get_client(device_id).insertRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.__default_client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1003,14 +1069,14 @@ class Session(object):
# send request
try:
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.get_client(device_id).insertRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertRecordsOfOneDevice(request)
+ self.__default_client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1054,15 +1120,17 @@ class Session(object):
should be used to test other time cost in client
:param tablet: a tablet of data
"""
+ request = self.gen_insert_tablet_req(tablet)
try:
- request = self.gen_insert_tablet_req(tablet)
- return Session.verify_success(self.__client.testInsertTablet(request))
+ return Session.verify_success(
+ self.get_client(tablet.get_device_id()).testInsertTablet(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.testInsertTablet(request)
+ self.__default_client.testInsertTablet(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -1076,14 +1144,17 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
+ request = self.gen_insert_tablets_req(tablet_list)
try:
- request = self.gen_insert_tablets_req(tablet_list)
- return Session.verify_success(self.__client.testInsertTablets(request))
+ return Session.verify_success(
+ self.__default_client.testInsertTablets(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
+ request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.testInsertTablets(request)
+ self.__default_client.testInsertTablets(request)
)
except TTransport.TException as e1:
logger.exception("test insert fails because: ", e1)
@@ -1142,13 +1213,13 @@ class Session(object):
self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout
)
try:
- resp = self.__client.executeQueryStatement(request)
+ resp = self.__default_client.executeQueryStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeQueryStatement(request)
+ resp = self.__default_client.executeQueryStatement(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1161,7 +1232,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1175,13 +1246,13 @@ class Session(object):
"""
request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
try:
- resp = self.__client.executeUpdateStatement(request)
+ resp = self.__default_client.executeUpdateStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeUpdateStatement(request)
+ resp = self.__default_client.executeUpdateStatement(request)
except TTransport.TException as e1:
logger.exception(
"execution of non-query statement fails because: ", e1
@@ -1196,13 +1267,13 @@ class Session(object):
self.__session_id, sql, self.__statement_id, timeout
)
try:
- resp = self.__client.executeStatement(request)
+ resp = self.__default_client.executeStatement(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeStatement(request)
+ resp = self.__default_client.executeStatement(request)
except TTransport.TException as e1:
logger.exception("execution of statement fails because: ", e1)
raise e1
@@ -1216,7 +1287,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1276,7 +1347,7 @@ class Session(object):
if self.__zone_id is not None:
return self.__zone_id
try:
- resp = self.__client.getTimeZone(self.__session_id)
+ resp = self.__default_client.getTimeZone(self.__session_id)
except TTransport.TException as e:
raise RuntimeError("Could not get time zone because: ", e)
return resp.timeZone
@@ -1284,7 +1355,7 @@ class Session(object):
def set_time_zone(self, zone_id):
request = TSSetTimeZoneReq(self.__session_id, zone_id)
try:
- status = self.__client.setTimeZone(request)
+ status = self.__default_client.setTimeZone(request)
logger.debug(
"setting time zone_id as {}, message: {}".format(
zone_id, status.message
@@ -1354,13 +1425,13 @@ class Session(object):
enableRedirectQuery=False,
)
try:
- resp = self.__client.executeRawDataQuery(request)
+ resp = self.__default_client.executeRawDataQuery(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeRawDataQuery(request)
+ resp = self.__default_client.executeRawDataQuery(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1373,7 +1444,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1396,13 +1467,13 @@ class Session(object):
enableRedirectQuery=False,
)
try:
- resp = self.__client.executeLastDataQuery(request)
+ resp = self.__default_client.executeLastDataQuery(request)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
request.statementId = self.__statement_id
- resp = self.__client.executeLastDataQuery(request)
+ resp = self.__default_client.executeLastDataQuery(request)
except TTransport.TException as e1:
logger.exception("execution of query statement fails because: ", e1)
raise e1
@@ -1415,7 +1486,7 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
+ self.__default_client,
self.__statement_id,
self.__session_id,
resp.queryDataSet,
@@ -1449,14 +1520,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.get_client(device_id).insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.__default_client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1481,14 +1552,14 @@ class Session(object):
)
try:
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.get_client(device_id).insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.insertStringRecordsOfOneDevice(request)
+ self.__default_client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
logger.exception("insert fails because: ", e1)
@@ -1501,8 +1572,8 @@ class Session(object):
return False
connected = False
for i in range(1, self.RETRY_NUM + 1):
- if self.__transport is not None:
- self.__transport.close()
+ if self.__default_connection.transport is not None:
+ self.__default_connection.transport.close()
curr_host_index = random.randint(0, len(self.__hosts))
try_host_num = 0
for j in range(curr_host_index, len(self.__hosts)):
@@ -1515,7 +1586,10 @@ class Session(object):
j = -1
try_host_num += 1
try:
- self.init_connection(self.__default_endpoint)
+ self.__default_connection = self.init_connection(
+ self.__default_endpoint
+ )
+ self.__default_client = self.__default_connection.client
connected = True
except TTransport.TException as e:
continue
@@ -1524,6 +1598,37 @@ class Session(object):
break
return connected
+ def get_client(self, device_id):
+ if (
+ self.__enable_redirection
+ and len(self.__device_id_to_endpoint) != 0
+ and self.__device_id_to_endpoint.has_key(device_id)
+ ):
+ endpoint = self.__device_id_to_endpoint.get(device_id)
+ if self.__endpoint_to_connection.has_key(endpoint):
+ return self.__endpoint_to_connection.get(endpoint).client
+ return self.__default_client
+
+ def handle_redirection(self, device_id, endpoint):
+ if self.__enable_redirection:
+ if endpoint.ip is "0.0.0.0":
+ return
+ if (
+ not self.__device_id_to_endpoint.has_key(device_id)
+ or self.__device_id_to_endpoint.get(device_id) != endpoint
+ ):
+ self.__device_id_to_endpoint[device_id] = endpoint
+ if self.__endpoint_to_connection.has_key(endpoint):
+ connection = self.__endpoint_to_connection[endpoint]
+ else:
+ try:
+ connection = self.init_connection(endpoint)
+ except Exception:
+ connection = None
+ self.__endpoint_to_connection[endpoint] = connection
+ if connection is None:
+ self.__device_id_to_endpoint.pop(device_id)
+
def gen_insert_string_records_of_one_device_request(
self,
device_id,
@@ -1562,13 +1667,15 @@ class Session(object):
self.__session_id, template.get_name(), bytes_array
)
try:
- return Session.verify_success(self.__client.createSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.createSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.createSchemaTemplate(request)
+ self.__default_client.createSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("create template fails because: ", e1)
@@ -1583,13 +1690,15 @@ class Session(object):
"""
request = TSDropSchemaTemplateReq(self.__session_id, template_name)
try:
- return Session.verify_success(self.__client.dropSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.dropSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.dropSchemaTemplate(request)
+ self.__default_client.dropSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("drop template fails because: ", e1)
@@ -1625,13 +1734,15 @@ class Session(object):
list(map(lambda x: x.value, compressors)),
)
try:
- return Session.verify_success(self.__client.appendSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.appendSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.appendSchemaTemplate(request)
+ self.__default_client.appendSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("append template fails because: ", e1)
@@ -1647,13 +1758,15 @@ class Session(object):
"""
request = TSPruneSchemaTemplateReq(self.__session_id, template_name, path)
try:
- return Session.verify_success(self.__client.pruneSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.pruneSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.pruneSchemaTemplate(request)
+ self.__default_client.pruneSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("prune template fails because: ", e1)
@@ -1669,13 +1782,15 @@ class Session(object):
"""
request = TSSetSchemaTemplateReq(self.__session_id, template_name, prefix_path)
try:
- return Session.verify_success(self.__client.setSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.setSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.setSchemaTemplate(request)
+ self.__default_client.setSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("set template fails because: ", e1)
@@ -1694,13 +1809,15 @@ class Session(object):
self.__session_id, prefix_path, template_name
)
try:
- return Session.verify_success(self.__client.unsetSchemaTemplate(request))
+ return Session.verify_success(
+ self.__default_client.unsetSchemaTemplate(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
return Session.verify_success(
- self.__client.unsetSchemaTemplate(request)
+ self.__default_client.unsetSchemaTemplate(request)
)
except TTransport.TException as e1:
logger.exception("unset template fails because: ", e1)
@@ -1719,13 +1836,13 @@ class Session(object):
TemplateQueryType.COUNT_MEASUREMENTS.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
return response.count
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.count
except TTransport.TException as e1:
@@ -1749,14 +1866,14 @@ class Session(object):
path,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e1:
@@ -1778,14 +1895,14 @@ class Session(object):
self.__session_id, template_name, TemplateQueryType.PATH_EXIST.value, path
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.result
except TTransport.TException as e1:
@@ -1809,14 +1926,14 @@ class Session(object):
pattern,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1837,14 +1954,14 @@ class Session(object):
TemplateQueryType.SHOW_TEMPLATES.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1862,14 +1979,14 @@ class Session(object):
self.__session_id, template_name, TemplateQueryType.SHOW_SET_TEMPLATES.value
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1889,14 +2006,14 @@ class Session(object):
TemplateQueryType.SHOW_USING_TEMPLATES.value,
)
try:
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- response = self.__client.querySchemaTemplate(request)
+ response = self.__default_client.querySchemaTemplate(request)
Session.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
@@ -1904,3 +2021,25 @@ class Session(object):
raise e1
else:
raise e
+
+
+class SessionConnection(object):
+ def __init__(
+ self,
+ client,
+ transport,
+ ):
+ self.client = client
+ self.transport = transport
+
+ def close_connection(self, req):
+ try:
+ self.client.closeSession(req)
+ except TTransport.TException as e:
+ logger.exception(
+ "Error occurs when closing session at server. Maybe server is down. Error message: ",
+ exc_info=e,
+ )
+ finally:
+ if self.transport is not None:
+ self.transport.close()