You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/01/18 03:13:08 UTC

[iotdb] branch add_python_interface created (now c7f3a06)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch add_python_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at c7f3a06  add interface without sort

This branch includes the following new commits:

     new c7f3a06  add interface without sort

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add interface without sort

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch add_python_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7f3a0609a013b2dfd5e17345e7d1ec83dde2750
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Jan 18 11:12:29 2021 +0800

    add interface without sort
---
 client-py/src/SessionExample.py              | 14 ++++++--
 client-py/src/iotdb/Session.py               | 53 ++++++++++++++++++++++++++--
 client-py/src/iotdb/utils/IoTDBRpcDataSet.py |  5 +--
 3 files changed, 65 insertions(+), 7 deletions(-)

diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
index 697d075..0bc08b5 100644
--- a/client-py/src/SessionExample.py
+++ b/client-py/src/SessionExample.py
@@ -89,14 +89,24 @@ tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_,
 tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
 session.insert_tablets([tablet_01, tablet_02])
 
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
+data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+                   [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list)
+
 # execute non-query sql statement
-session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
+session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)")
 
 # execute sql query statement
 session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
 session_data_set.set_fetch_size(1024)
 while session_data_set.has_next():
-    print(session_data_set.next())
+  print(session_data_set.next())
 session_data_set.close_operation_handle()
 
 # close session connection.
diff --git a/client-py/src/iotdb/Session.py b/client-py/src/iotdb/Session.py
index 7cb6e81..ce16c85 100644
--- a/client-py/src/iotdb/Session.py
+++ b/client-py/src/iotdb/Session.py
@@ -26,7 +26,8 @@ from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
 from .thrift.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
-     TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+    TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq, \
+    TSInsertRecordsOfOneDeviceReq
 from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
 
 # for debug
@@ -61,6 +62,7 @@ class Session(object):
         self.__session_id = None
         self.__statement_id = None
         self.__zone_id = zone_id
+        self.__default_timeout = 1000
 
     def open(self, enable_rpc_compression):
         if not self.__is_close:
@@ -293,7 +295,7 @@ class Session(object):
 
     def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst):
         if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \
-           (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
+            (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
             print("deviceIds, times, measurementsList and valuesList's size should be equal")
             # could raise an error here.
             return
@@ -332,6 +334,50 @@ class Session(object):
         status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
         print("insert multiple tablets, message: {}".format(status.message))
 
+
+    def insert_records_of_one_device(self, device_id, times_list, measurements_list, types_list, values_list):
+        print("here")
+        self.insert_records_of_one_device_sorted(device_id, times_list, measurements_list, types_list, values_list)
+
+    def insert_records_of_one_device_sorted(self, device_id, times_list, measurements_list, types_list, values_list):
+        """
+        Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+        executeBatch, we pack some insert request in batch and send them to server. If you want improve
+        your performance, please see insertTablet method
+
+        :param device_id: device id
+        :param times_list: timestamps list
+        :param measurements_list: measurements list
+        :param types_list: types list
+        :param values_list: values list
+        :param have_sorted: have these list been sorted by timestamp
+        """
+        # check parameter
+        size = len(times_list)
+        if (size != len(measurements_list) or size != len(types_list) or size != len(values_list)):
+            print("types, times, measurementsList and valuesList's size should be equal")
+            return
+
+        request = self.gen_insert_records_of_one_device_request(device_id, times_list, measurements_list, values_list, types_list)
+
+        # send request
+        status = self.__client.insertRecordsOfOneDevice(request)
+        print("insert records of one device, message: {}".format(status.message))
+
+    def gen_insert_records_of_one_device_request(self, device_id, times_list, measurements_list, values_list, types_list):
+        binary_value_list = []
+        for values, data_types, measurements in zip(values_list, types_list, measurements_list):
+            data_types = [data_type.value for data_type in data_types]
+            if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+                print("deviceIds, times, measurementsList and valuesList's size should be equal")
+                # could raise an error here.
+                return
+            values_in_bytes = Session.value_to_bytes(data_types, values)
+            binary_value_list.append(values_in_bytes)
+
+        return TSInsertRecordsOfOneDeviceReq(self.__session_id, device_id, measurements_list, binary_value_list, times_list)
+
+
     def test_insert_tablet(self, tablet):
         """
          this method NOT insert data into database and the server just return after accept the request, this method
@@ -380,7 +426,7 @@ class Session(object):
         :param sql: String, query sql statement
         :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
         """
-        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size)
+        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size, self.__default_timeout)
         resp = self.__client.executeQueryStatement(request)
         return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId,
                               self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp)
@@ -403,6 +449,7 @@ class Session(object):
         format_str_list = [">"]
         values_tobe_packed = []
         for data_type, value in zip(data_types, values):
+            print(data_type, TSDataType.BOOLEAN.value)
             if data_type == TSDataType.BOOLEAN.value:
                 format_str_list.append("h")
                 format_str_list.append("?")
diff --git a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
index 21c2f0e..6830488 100644
--- a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
@@ -38,7 +38,7 @@ class IoTDBRpcDataSet(object):
     FLAG = 0x80
 
     def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
-                 client, session_id, query_data_set, fetch_size):
+        client, session_id, query_data_set, fetch_size):
         self.__session_id = session_id
         self.__ignore_timestamp = ignore_timestamp
         self.__sql = sql
@@ -46,6 +46,7 @@ class IoTDBRpcDataSet(object):
         self.__client = client
         self.__fetch_size = fetch_size
         self.__column_size = len(column_name_list)
+        self.__default_time_out = 1000
 
         self.__column_name_list = []
         self.__column_type_list = []
@@ -157,7 +158,7 @@ class IoTDBRpcDataSet(object):
 
     def fetch_results(self):
         self.__rows_index = 0
-        request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
+        request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True, self.__default_time_out)
         try:
             resp = self.__client.fetchResults(request)
             if not resp.hasResultSet: