You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/01/18 03:13:09 UTC

[iotdb] 01/01: add interface without sort

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch add_python_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7f3a0609a013b2dfd5e17345e7d1ec83dde2750
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Jan 18 11:12:29 2021 +0800

    add interface without sort
---
 client-py/src/SessionExample.py              | 14 ++++++--
 client-py/src/iotdb/Session.py               | 53 ++++++++++++++++++++++++++--
 client-py/src/iotdb/utils/IoTDBRpcDataSet.py |  5 +--
 3 files changed, 65 insertions(+), 7 deletions(-)

diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
index 697d075..0bc08b5 100644
--- a/client-py/src/SessionExample.py
+++ b/client-py/src/SessionExample.py
@@ -89,14 +89,24 @@ tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_,
 tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
 session.insert_tablets([tablet_01, tablet_02])
 
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
+data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list)
+
 # execute non-query sql statement
-session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
+session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)")
 
 # execute sql query statement
 session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
 session_data_set.set_fetch_size(1024)
 while session_data_set.has_next():
-    print(session_data_set.next())
+  print(session_data_set.next())
 session_data_set.close_operation_handle()
 
 # close session connection.
diff --git a/client-py/src/iotdb/Session.py b/client-py/src/iotdb/Session.py
index 7cb6e81..ce16c85 100644
--- a/client-py/src/iotdb/Session.py
+++ b/client-py/src/iotdb/Session.py
@@ -26,7 +26,8 @@ from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
 from .thrift.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
-     TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+    TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq, \
+    TSInsertRecordsOfOneDeviceReq
 from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
 
 # for debug
@@ -61,6 +62,7 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__default_timeout = 1000
 
     def open(self, enable_rpc_compression):
         if not self.__is_close:
@@ -293,7 +295,7 @@ class Session(object):
 
     def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst):
         if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \
-           (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
+            (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
             print("deviceIds, times, measurementsList and valuesList's size should be equal")
             # could raise an error here.
             return
@@ -332,6 +334,50 @@ class Session(object):
         status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
         print("insert multiple tablets, message: {}".format(status.message))
 
+
+    def insert_records_of_one_device(self, device_id, times_list, measurements_list, types_list, values_list):
+        print("here")
+        self.insert_records_of_one_device_sorted(device_id, times_list, measurements_list, types_list, values_list)
+
+    def insert_records_of_one_device_sorted(self, device_id, times_list, measurements_list, types_list, values_list):
+        """
+        Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+        executeBatch, we pack some insert request in batch and send them to server. If you want improve
+        your performance, please see insertTablet method
+
+        :param device_id: device id
+        :param times_list: timestamps list
+        :param measurements_list: measurements list
+        :param types_list: types list
+        :param values_list: values list
+        :param have_sorted: have these list been sorted by timestamp
+        """
+        # check parameter
+        size = len(times_list)
+        if (size != len(measurements_list) or size != len(types_list) or size != len(values_list)):
+            print("types, times, measurementsList and valuesList's size should be equal")
+            return
+
+        request = self.gen_insert_records_of_one_device_request(device_id, times_list, measurements_list, values_list, types_list)
+
+        # send request
+        status = self.__client.insertRecordsOfOneDevice(request)
+        print("insert records of one device, message: {}".format(status.message))
+
+    def gen_insert_records_of_one_device_request(self, device_id, times_list, measurements_list, values_list, types_list):
+        binary_value_list = []
+        for values, data_types, measurements in zip(values_list, types_list, measurements_list):
+            data_types = [data_type.value for data_type in data_types]
+            if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+                print("deviceIds, times, measurementsList and valuesList's size should be equal")
+                # could raise an error here.
+                return
+            values_in_bytes = Session.value_to_bytes(data_types, values)
+            binary_value_list.append(values_in_bytes)
+
+        return TSInsertRecordsOfOneDeviceReq(self.__session_id, device_id, measurements_list, binary_value_list, times_list)
+
+
     def test_insert_tablet(self, tablet):
         """
          this method NOT insert data into database and the server just return after accept the request, this method
@@ -380,7 +426,7 @@ class Session(object):
         :param sql: String, query sql statement
         :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
         """
-        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size)
+        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size, self.__default_timeout)
         resp = self.__client.executeQueryStatement(request)
         return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId,
                               self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp)
@@ -403,6 +449,7 @@ class Session(object):
         format_str_list = [">"]
         values_tobe_packed = []
         for data_type, value in zip(data_types, values):
+            print(data_type, TSDataType.BOOLEAN.value)
             if data_type == TSDataType.BOOLEAN.value:
                 format_str_list.append("h")
                 format_str_list.append("?")
diff --git a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
index 21c2f0e..6830488 100644
--- a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
@@ -38,7 +38,7 @@ class IoTDBRpcDataSet(object):
     FLAG = 0x80
 
     def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
-                 client, session_id, query_data_set, fetch_size):
+        client, session_id, query_data_set, fetch_size):
         self.__session_id = session_id
         self.__ignore_timestamp = ignore_timestamp
         self.__sql = sql
@@ -46,6 +46,7 @@ class IoTDBRpcDataSet(object):
         self.__client = client
         self.__fetch_size = fetch_size
         self.__column_size = len(column_name_list)
+        self.__default_time_out = 1000
 
         self.__column_name_list = []
         self.__column_type_list = []
@@ -157,7 +158,7 @@ class IoTDBRpcDataSet(object):
 
     def fetch_results(self):
         self.__rows_index = 0
-        request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
+        request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True, self.__default_time_out)
         try:
             resp = self.__client.fetchResults(request)
             if not resp.hasResultSet: