You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/01/18 03:13:09 UTC
[iotdb] 01/01: add interface without sort
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch add_python_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c7f3a0609a013b2dfd5e17345e7d1ec83dde2750
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Jan 18 11:12:29 2021 +0800
add interface without sort
---
client-py/src/SessionExample.py | 14 ++++++--
client-py/src/iotdb/Session.py | 53 ++++++++++++++++++++++++++--
client-py/src/iotdb/utils/IoTDBRpcDataSet.py | 5 +--
3 files changed, 65 insertions(+), 7 deletions(-)
diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
index 697d075..0bc08b5 100644
--- a/client-py/src/SessionExample.py
+++ b/client-py/src/SessionExample.py
@@ -89,14 +89,24 @@ tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_,
tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
session.insert_tablets([tablet_01, tablet_02])
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
+data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list)
+
# execute non-query sql statement
-session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
+session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)")
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
session_data_set.set_fetch_size(1024)
while session_data_set.has_next():
- print(session_data_set.next())
+ print(session_data_set.next())
session_data_set.close_operation_handle()
# close session connection.
diff --git a/client-py/src/iotdb/Session.py b/client-py/src/iotdb/Session.py
index 7cb6e81..ce16c85 100644
--- a/client-py/src/iotdb/Session.py
+++ b/client-py/src/iotdb/Session.py
@@ -26,7 +26,8 @@ from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TSocket, TTransport
from .thrift.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
- TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+ TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq, \
+ TSInsertRecordsOfOneDeviceReq
from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
# for debug
@@ -61,6 +62,7 @@ class Session(object):
self.__session_id = None
self.__statement_id = None
self.__zone_id = zone_id
+ self.__default_timeout = 1000
def open(self, enable_rpc_compression):
if not self.__is_close:
@@ -293,7 +295,7 @@ class Session(object):
def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst):
if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \
- (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
+ (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
print("deviceIds, times, measurementsList and valuesList's size should be equal")
# could raise an error here.
return
@@ -332,6 +334,50 @@ class Session(object):
status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
print("insert multiple tablets, message: {}".format(status.message))
+
+ def insert_records_of_one_device(self, device_id, times_list, measurements_list, types_list, values_list):
+ print("here")
+ self.insert_records_of_one_device_sorted(device_id, times_list, measurements_list, types_list, values_list)
+
+ def insert_records_of_one_device_sorted(self, device_id, times_list, measurements_list, types_list, values_list):
+ """
+ Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+ executeBatch, we pack some insert request in batch and send them to server. If you want improve
+ your performance, please see insertTablet method
+
+ :param device_id: device id
+ :param times_list: timestamps list
+ :param measurements_list: measurements list
+ :param types_list: types list
+ :param values_list: values list
+ :param have_sorted: have these list been sorted by timestamp
+ """
+ # check parameter
+ size = len(times_list)
+ if (size != len(measurements_list) or size != len(types_list) or size != len(values_list)):
+ print("types, times, measurementsList and valuesList's size should be equal")
+ return
+
+ request = self.gen_insert_records_of_one_device_request(device_id, times_list, measurements_list, values_list, types_list)
+
+ # send request
+ status = self.__client.insertRecordsOfOneDevice(request)
+ print("insert records of one device, message: {}".format(status.message))
+
+ def gen_insert_records_of_one_device_request(self, device_id, times_list, measurements_list, values_list, types_list):
+ binary_value_list = []
+ for values, data_types, measurements in zip(values_list, types_list, measurements_list):
+ data_types = [data_type.value for data_type in data_types]
+ if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+ print("deviceIds, times, measurementsList and valuesList's size should be equal")
+ # could raise an error here.
+ return
+ values_in_bytes = Session.value_to_bytes(data_types, values)
+ binary_value_list.append(values_in_bytes)
+
+ return TSInsertRecordsOfOneDeviceReq(self.__session_id, device_id, measurements_list, binary_value_list, times_list)
+
+
def test_insert_tablet(self, tablet):
"""
this method NOT insert data into database and the server just return after accept the request, this method
@@ -380,7 +426,7 @@ class Session(object):
:param sql: String, query sql statement
:return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
"""
- request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size)
+ request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size, self.__default_timeout)
resp = self.__client.executeQueryStatement(request)
return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId,
self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp)
@@ -403,6 +449,7 @@ class Session(object):
format_str_list = [">"]
values_tobe_packed = []
for data_type, value in zip(data_types, values):
+ print(data_type, TSDataType.BOOLEAN.value)
if data_type == TSDataType.BOOLEAN.value:
format_str_list.append("h")
format_str_list.append("?")
diff --git a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
index 21c2f0e..6830488 100644
--- a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
@@ -38,7 +38,7 @@ class IoTDBRpcDataSet(object):
FLAG = 0x80
def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
- client, session_id, query_data_set, fetch_size):
+ client, session_id, query_data_set, fetch_size):
self.__session_id = session_id
self.__ignore_timestamp = ignore_timestamp
self.__sql = sql
@@ -46,6 +46,7 @@ class IoTDBRpcDataSet(object):
self.__client = client
self.__fetch_size = fetch_size
self.__column_size = len(column_name_list)
+ self.__default_time_out = 1000
self.__column_name_list = []
self.__column_type_list = []
@@ -157,7 +158,7 @@ class IoTDBRpcDataSet(object):
def fetch_results(self):
self.__rows_index = 0
- request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
+ request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True, self.__default_time_out)
try:
resp = self.__client.fetchResults(request)
if not resp.hasResultSet: