You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/21 13:09:08 UTC
[incubator-iotdb] 01/04: python session client ver-0.10.0
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 239c7dcb28cd9ce2d74ea0e6ecf270d8fcbde629
Author: Zekun Li <lz...@gmail.com>
AuthorDate: Mon Jul 20 14:16:45 2020 +0800
python session client ver-0.10.0
---
client-py/src/Session.py | 460 +++++++++++++++++++++++++++++++++
client-py/src/SessionExample.py | 105 ++++++++
client-py/src/SessionUT.py | 107 ++++++++
client-py/src/utils/Field.py | 176 +++++++++++++
client-py/src/utils/IoTDBConstants.py | 52 ++++
client-py/src/utils/IoTDBRpcDataSet.py | 219 ++++++++++++++++
client-py/src/utils/RowRecord.py | 55 ++++
client-py/src/utils/SessionDataSet.py | 101 ++++++++
client-py/src/utils/Tablet.py | 133 ++++++++++
client-py/src/utils/__init__.py | 18 ++
10 files changed, 1426 insertions(+)
diff --git a/client-py/src/Session.py b/client-py/src/Session.py
new file mode 100644
index 0000000..c43f274
--- /dev/null
+++ b/client-py/src/Session.py
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import struct
+
+# if you run compile.sh, you can use the following code:
+sys.path.append("../target")
+
+# if you use maven to compile the thrift api, just use the following code:
+# sys.path.append("../../thrift/target/generated-sources-python")
+
+sys.path.append("./utils")
+
+from IoTDBConstants import *
+from SessionDataSet import SessionDataSet
+
+from thrift.protocol import TBinaryProtocol, TCompactProtocol
+from thrift.transport import TSocket, TTransport
+
+from iotdb.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
+ TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \
+ TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
+
+
+class Session(object):
+ DEFAULT_FETCH_SIZE = 10000
+ DEFAULT_USER = 'root'
+ DEFAULT_PASSWORD = 'root'
+
+ def __init__(self, host, port, user=DEFAULT_USER, password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE):
+ self.__host = host
+ self.__port = port
+ self.__user = user
+ self.__password = password
+ self.__fetch_size = fetch_size
+ self.__is_close = True
+ self.__transport = None
+ self.__client = None
+ self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2
+ self.__session_id = None
+ self.__statement_id = None
+ self.__zone_id = None
+
+ def open(self, enable_rpc_compression):
+ if not self.__is_close:
+ return
+ self.__transport = TTransport.TBufferedTransport(TSocket.TSocket(self.__host, self.__port))
+
+ if not self.__transport.isOpen():
+ try:
+ self.__transport.open()
+ except TTransport.TTransportException as e:
+ print('TTransportException: ', e)
+
+ if enable_rpc_compression:
+ self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+ else:
+ self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+
+ open_req = TSOpenSessionReq(client_protocol=self.protocol_version,
+ username=self.__user,
+ password=self.__password)
+
+ try:
+ open_resp = self.__client.openSession(open_req)
+
+ if self.protocol_version != open_resp.serverProtocolVersion:
+ print("Protocol differ, Client version is {}, but Server version is {}".format(
+ self.protocol_version, open_resp.serverProtocolVersion))
+ # version is less than 0.10
+ if open_resp.serverProtocolVersion == 0:
+ raise TTransport.TException(message="Protocol not supported.")
+
+ self.__session_id = open_resp.sessionId
+ self.__statement_id = self.__client.requestStatementId(self.__session_id)
+
+ except Exception as e:
+ self.__transport.close()
+ print("session closed because: ", e)
+
+ if self.__zone_id is not None:
+ self.set_time_zone(self.__zone_id)
+ else:
+ self.__zone_id = self.get_time_zone()
+
+ self.__is_close = False
+
+ def close(self):
+ if self.__is_close:
+ return
+ req = TSCloseSessionReq(self.__session_id)
+ try:
+ self.__client.closeSession(req)
+ except TTransport.TException as e:
+ print("Error occurs when closing session at server. Maybe server is down. Error message: ", e)
+ finally:
+ self.__is_close = True
+ if self.__transport is not None:
+ self.__transport.close()
+
+ def set_storage_group(self, group_name):
+ """
+ set one storage group
+ :param group_name: String, storage group name (starts from root)
+ """
+ status = self.__client.setStorageGroup(self.__session_id, group_name)
+ print("setting storage group {} message: {}".format(group_name, status.message))
+
+ def delete_storage_group(self, storage_group):
+ """
+ delete one storage group.
+ :param storage_group: String, path of the target storage group.
+ """
+ groups = [storage_group]
+ self.delete_storage_groups(groups)
+
+ def delete_storage_groups(self, storage_group_lst):
+ """
+ delete multiple storage groups.
+ :param storage_group_lst: List, paths of the target storage groups.
+ """
+ status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+ print("delete storage group(s) {} message: {}".format(storage_group_lst, status.message))
+
+ def create_time_series(self, ts_path, data_type, encoding, compressor):
+ """
+ create single time series
+ :param ts_path: String, complete time series path (starts from root)
+ :param data_type: TSDataType, data type for this time series
+ :param encoding: TSEncoding, encoding for this time series
+ :param compressor: Compressor, compressing type for this time series
+ """
+ data_type = data_type.value
+ encoding = encoding.value
+ compressor = compressor.value
+ request = TSCreateTimeseriesReq(self.__session_id, ts_path, data_type, encoding, compressor)
+ status = self.__client.createTimeseries(request)
+ print("creating time series {} message: {}".format(ts_path, status.message))
+
+ def create_multi_time_series(self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst):
+ """
+ create multiple time series
+ :param ts_path_lst: List of String, complete time series paths (starts from root)
+ :param data_type_lst: List of TSDataType, data types for time series
+ :param encoding_lst: List of TSEncoding, encodings for time series
+ :param compressor_lst: List of Compressor, compressing types for time series
+ """
+ data_type_lst = [data_type.value for data_type in data_type_lst]
+ encoding_lst = [encoding.value for encoding in encoding_lst]
+ compressor_lst = [compressor.value for compressor in compressor_lst]
+
+ request = TSCreateMultiTimeseriesReq(self.__session_id, ts_path_lst, data_type_lst,
+ encoding_lst, compressor_lst)
+ resp = self.__client.createMultiTimeseries(request)
+ print("creating multiple time series {} message: {}".format(ts_path_lst, resp.statusList[0].message))
+
+ def delete_time_series(self, paths_list):
+ """
+ delete multiple time series, including data and schema
+ :param paths_list: List of time series path, which should be complete (starts from root)
+ """
+ status = self.__client.deleteTimeseries(self.__session_id, paths_list)
+ print("deleting multiple time series {} message: {}".format(paths_list, status.message))
+
+ def check_time_series_exists(self, path):
+ """
+ check whether a specific time series exists
+ :param path: String, complete path of time series for checking
+ :return Boolean value indicates whether it exists.
+ """
+ data_set = self.execute_query_statement("SHOW TIMESERIES {}".format(path))
+ result = data_set.has_next()
+ data_set.close_operation_handle()
+ return result
+
+ def delete_data(self, paths_list, timestamp):
+ """
+ delete all data <= time in multiple time series
+ :param paths_list: time series list that the data in.
+ :param timestamp: data with time stamp less than or equal to time will be deleted.
+ """
+ request = TSDeleteDataReq(self.__session_id, paths_list, timestamp)
+ try:
+ status = self.__client.deleteData(request)
+ print("delete data from {}, message: {}".format(paths_list, status.message))
+ except TTransport.TException as e:
+ print("data deletion fails because: ", e)
+
+ def insert_str_record(self, device_id, timestamp, measurements, string_values):
+ """ special case for inserting one row of String (TEXT) value """
+ data_types = [TSDataType.TEXT.value for _ in string_values]
+ request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, string_values)
+ status = self.__client.insertRecord(request)
+ print("insert one record to device {} message: {}".format(device_id, status.message))
+
+ def insert_record(self, device_id, timestamp, measurements, data_types, values):
+ """
+ insert one row of record into database, if you want improve your performance, please use insertTablet method
+ for example a record at time=10086 with three measurements is:
+ timestamp, m1, m2, m3
+ 10086, 125.3, True, text1
+ :param device_id: String, time series path for device
+ :param timestamp: Integer, indicate the timestamp of the row of data
+ :param measurements: List of String, sensor names
+ :param data_types: List of TSDataType, indicate the data type for each sensor
+ :param values: List, values to be inserted, for each sensor
+ """
+ data_types = [data_type.value for data_type in data_types]
+ request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values)
+ status = self.__client.insertRecord(request)
+ print("insert one record to device {} message: {}".format(device_id, status.message))
+
+ def insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
+ """
+ insert multiple rows of data, records are independent to each other, in other words, there's no relationship
+ between those records
+ :param device_ids: List of String, time series paths for device
+ :param times: List of Integer, timestamps for records
+ :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+ :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+ :param values_lst: 2-D List, values to be inserted, for each device
+ """
+ type_values_lst = []
+ for types in types_lst:
+ data_types = [data_type.value for data_type in types]
+ type_values_lst.append(data_types)
+ request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst)
+ resp = self.__client.insertRecords(request)
+ print("insert multiple records to devices {} message: {}".format(device_ids, resp.statusList[0].message))
+
+ def test_insert_record(self, device_id, timestamp, measurements, data_types, values):
+ """
+ this method NOT insert data into database and the server just return after accept the request, this method
+ should be used to test other time cost in client
+ :param device_id: String, time series path for device
+ :param timestamp: Integer, indicate the timestamp of the row of data
+ :param measurements: List of String, sensor names
+ :param data_types: List of TSDataType, indicate the data type for each sensor
+ :param values: List, values to be inserted, for each sensor
+ """
+ data_types = [data_type.value for data_type in data_types]
+ request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values)
+ status = self.__client.testInsertRecord(request)
+ print("testing! insert one record to device {} message: {}".format(device_id, status.message))
+
+ def test_insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
+ """
+ this method NOT insert data into database and the server just return after accept the request, this method
+ should be used to test other time cost in client
+ :param device_ids: List of String, time series paths for device
+ :param times: List of Integer, timestamps for records
+ :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+ :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+ :param values_lst: 2-D List, values to be inserted, for each device
+ """
+ type_values_lst = []
+ for types in types_lst:
+ data_types = [data_type.value for data_type in types]
+ type_values_lst.append(data_types)
+ request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst)
+ status = self.__client.testInsertRecords(request)
+ print("testing! insert multiple records, message: {}".format(status.message))
+
+ def gen_insert_record_req(self, device_id, timestamp, measurements, data_types, values):
+ if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+ print("length of data types does not equal to length of values!")
+ # could raise an error here.
+ return
+ values_in_bytes = Session.value_to_bytes(data_types, values)
+ return TSInsertRecordReq(self.__session_id, device_id, measurements, values_in_bytes, timestamp)
+
+ def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst):
+ if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \
+ (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
+ print("deviceIds, times, measurementsList and valuesList's size should be equal")
+ # could raise an error here.
+ return
+
+ value_lst = []
+ for values, data_types, measurements in zip(values_lst, types_lst, measurements_lst):
+ if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+ print("deviceIds, times, measurementsList and valuesList's size should be equal")
+ # could raise an error here.
+ return
+ values_in_bytes = Session.value_to_bytes(data_types, values)
+ value_lst.append(values_in_bytes)
+
+ return TSInsertRecordsReq(self.__session_id, device_ids, measurements_lst, value_lst, times)
+
+ def insert_tablet(self, tablet):
+ """
+ insert one tablet, in a tablet, for each timestamp, the number of measurements is same
+ for example three records in the same device can form a tablet:
+ timestamps, m1, m2, m3
+ 1, 125.3, True, text1
+ 2, 111.6, False, text2
+ 3, 688.6, True, text3
+ Notice: The tablet should not have empty cell
+ The tablet itself is sorted (see docs of Tablet.py)
+ :param tablet: a tablet specified above
+ """
+ resp = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
+ print("insert one tablet to device {} message: {}".format(tablet.get_device_id(), resp.statusList[0].message))
+
+ def insert_tablets(self, tablet_lst):
+ """
+ insert multiple tablets, tablets are independent to each other
+ :param tablet_lst: List of tablets
+ """
+ resp = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
+ print("insert multiple tablets, message: {}".format(resp.statusList[0].message))
+
+ def test_insert_tablet(self, tablet):
+ """
+ this method NOT insert data into database and the server just return after accept the request, this method
+ should be used to test other time cost in client
+ :param tablet: a tablet of data
+ """
+ status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
+ print("testing! insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message))
+
+ def test_insert_tablets(self, tablet_list):
+ """
+ this method NOT insert data into database and the server just return after accept the request, this method
+ should be used to test other time cost in client
+ :param tablet_list: List of tablets
+ """
+ status = self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list))
+ print("testing! insert multiple tablets, message: {}".format(status.message))
+
+ def gen_insert_tablet_req(self, tablet):
+ data_type_values = [data_type.value for data_type in tablet.get_data_types()]
+ return TSInsertTabletReq(self.__session_id, tablet.get_device_id(), tablet.get_measurements(),
+ tablet.get_binary_values(), tablet.get_binary_timestamps(),
+ data_type_values, tablet.get_row_number())
+
+ def gen_insert_tablets_req(self, tablet_lst):
+ device_id_lst = []
+ measurements_lst = []
+ values_lst = []
+ timestamps_lst = []
+ type_lst = []
+ size_lst = []
+ for tablet in tablet_lst:
+ data_type_values = [data_type.value for data_type in tablet.get_data_types()]
+ device_id_lst.append(tablet.get_device_id())
+ measurements_lst.append(tablet.get_measurements())
+ values_lst.append(tablet.get_binary_values())
+ timestamps_lst.append(tablet.get_binary_timestamps())
+ type_lst.append(data_type_values)
+ size_lst.append(tablet.get_row_number())
+ return TSInsertTabletsReq(self.__session_id, device_id_lst, measurements_lst,
+ values_lst, timestamps_lst, type_lst, size_lst)
+
+ def execute_query_statement(self, sql):
+ """
+ execute query sql statement and returns SessionDataSet
+ :param sql: String, query sql statement
+ :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
+ """
+ request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size)
+ resp = self.__client.executeQueryStatement(request)
+ return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId,
+ self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp)
+
+ def execute_non_query_statement(self, sql):
+ """
+ execute non-query sql statement
+ :param sql: String, non-query sql statement
+ """
+ request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
+ try:
+ resp = self.__client.executeUpdateStatement(request)
+ status = resp.status
+ print("execute non-query statement {} message: {}".format(sql, status.message))
+ except TTransport.TException as e:
+ print("execution of non-query statement fails because: ", e)
+
+ @staticmethod
+ def value_to_bytes(data_types, values):
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for data_type, value in zip(data_types, values):
+ if data_type == TSDataType.BOOLEAN.value:
+ format_str_list.append("h")
+ format_str_list.append("?")
+ values_tobe_packed.append(TSDataType.BOOLEAN.value)
+ values_tobe_packed.append(value)
+ elif data_type == TSDataType.INT32.value:
+ format_str_list.append("h")
+ format_str_list.append("i")
+ values_tobe_packed.append(TSDataType.INT32.value)
+ values_tobe_packed.append(value)
+ elif data_type == TSDataType.INT64.value:
+ format_str_list.append("h")
+ format_str_list.append("q")
+ values_tobe_packed.append(TSDataType.INT64.value)
+ values_tobe_packed.append(value)
+ elif data_type == TSDataType.FLOAT.value:
+ format_str_list.append("h")
+ format_str_list.append("f")
+ values_tobe_packed.append(TSDataType.FLOAT.value)
+ values_tobe_packed.append(value)
+ elif data_type == TSDataType.DOUBLE.value:
+ format_str_list.append("h")
+ format_str_list.append("d")
+ values_tobe_packed.append(TSDataType.DOUBLE.value)
+ values_tobe_packed.append(value)
+ elif data_type == TSDataType.TEXT.value:
+ value_bytes = bytes(value, 'utf-8')
+ format_str_list.append("h")
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(TSDataType.TEXT.value)
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ else:
+ print("Unsupported data type:" + str(data_type))
+ # could raise an error here.
+ return
+ format_str = ''.join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
+
+ def get_time_zone(self):
+ if self.__zone_id is not None:
+ return self.__zone_id
+ try:
+ resp = self.__client.getTimeZone(self.__session_id)
+ except TTransport.TException as e:
+ print("Could not get time zone because: ", e)
+ raise Exception
+ return resp.timeZone
+
+ def set_time_zone(self, zone_id):
+ request = TSSetTimeZoneReq(self.__session_id, zone_id)
+ try:
+ status = self.__client.setTimeZone(request)
+ print("setting time zone_id as {}, message: {}".format(zone_id, status.message))
+ except TTransport.TException as e:
+ print("Could not get time zone because: ", e)
+ raise Exception
+ self.__zone_id = zone_id
diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
new file mode 100644
index 0000000..2aac6e9
--- /dev/null
+++ b/client-py/src/SessionExample.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+sys.path.append("./utils")
+from IoTDBConstants import *
+from Tablet import Tablet
+from Session import Session
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_)
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting time series.
+session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
+
+# setting multiple time series once.
+ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
+ "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
+data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
+ TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
+
+# delete time series
+session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"])
+
+# checking time series
+print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07"))
+print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03"))
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
+ TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
+
+# insert multiple records into database
+measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
+values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
+ [True, 77, 88, 1.25, 8.125, "test_records02"]]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_)
+
+# insert one tablet into the database.
+values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
+ [True, 100, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, 188.1, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
+session.insert_tablet(tablet_)
+
+# insert multiple tablets into database
+tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
+tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
+session.insert_tablets([tablet_01, tablet_02])
+
+# execute non-query sql statement
+session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
+session_data_set.set_fetch_size(1024)
+while session_data_set.has_next():
+ print(session_data_set.next())
+session_data_set.close_operation_handle()
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/src/SessionUT.py b/client-py/src/SessionUT.py
new file mode 100644
index 0000000..fc79cfe
--- /dev/null
+++ b/client-py/src/SessionUT.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import sys
+import struct
+
+sys.path.append("./utils")
+
+from Session import Session
+from Tablet import Tablet
+from IoTDBConstants import *
+from SessionDataSet import SessionDataSet
+
+from thrift.protocol import TBinaryProtocol, TCompactProtocol
+from thrift.transport import TSocket, TTransport
+
+
+class MyTestCase(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.session = Session("127.0.0.1", 6667, "root", "root")
+ self.session.open(False)
+
+ def tearDown(self) -> None:
+ self.session.close()
+
+ def test_insert_by_str(self):
+ self.session.set_storage_group("root.sg1")
+
+ self.__createTimeSeries(self.session)
+ self.insertByStr(self.session)
+
+ # sql test
+ self.insert_via_sql(self.session)
+ self.query3()
+
+ def test_insert_by_blank_str_infer_type(self):
+ device_id = "root.sg1.d1"
+ measurements = ["s1"]
+ values = ["1.0"]
+ self.session.insert_str_record(device_id, 1, measurements, values)
+
+ expected = "root.sg1.d1.s1 "
+
+ self.assertFalse(self.session.check_time_series_exists("root.sg1.d1.s1 "))
+ data_set = self.session.execute_query_statement("show timeseries")
+ i = 0
+ while data_set.hasNext():
+ self.assertEquals(expected[i], str(data_set.next().get_fields().get(0)))
+ i += 1
+
+ @staticmethod
+ def __createTimeSeries(session):
+ session.create_time_series("root.sg1.d1.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+ session.create_time_series("root.sg1.d1.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+ session.create_time_series("root.sg1.d1.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+ session.create_time_series("root.sg1.d2.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+ session.create_time_series("root.sg1.d2.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+ session.create_time_series("root.sg1.d2.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+
+ @staticmethod
+ def insertByStr(session):
+
+ device_id = "root.sg1.d1"
+ measurements = ["s1", "s2", "s3"]
+
+ for time in range(100):
+ values = ["1", "2", "3"]
+ session.insert_str_record(device_id, time, measurements, values)
+
+ @staticmethod
+ def insert_via_sql(session):
+ session.execute_non_query_statement("insert into root.sg1.d1(timestamp, s1, s2, s3) values(100, 1, 2, 3)")
+
+ def query3(self):
+ session_data_set = self.session.execute_query_statement("select * from root.sg1.d1")
+ session_data_set.set_fetch_size(1024)
+ count = 0
+ while session_data_set.has_next():
+ index = 1
+ count += 1
+ for f in session_data_set.next().get_fields():
+ self.assertEqual(str(index), f.get_string_value())
+ index += 1
+
+ self.assertEqual(101, count)
+ session_data_set.close_operation_handle()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/client-py/src/utils/Field.py b/client-py/src/utils/Field.py
new file mode 100644
index 0000000..7b5fea5
--- /dev/null
+++ b/client-py/src/utils/Field.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+
+
+class Field(object):
+
+ def __init__(self, data_type):
+ """
+ :param data_type: TSDataType
+ """
+ self.__data_type = data_type
+ self.__bool_value = None
+ self.__int_value = None
+ self.__long_value = None
+ self.__float_value = None
+ self.__double_value = None
+ self.__binary_value = None
+
+ @staticmethod
+ def copy(field):
+ output = Field(field.get_data_type())
+ if output.get_data_type() is not None:
+ if output.get_data_type() == TSDataType.BOOLEAN:
+ output.set_bool_value(field.get_bool_value())
+ elif output.get_data_type() == TSDataType.INT32:
+ output.set_int_value(field.get_int_value())
+ elif output.get_data_type() == TSDataType.INT64:
+ output.set_long_value(field.get_long_value())
+ elif output.get_data_type() == TSDataType.FLOAT:
+ output.set_float_value(field.get_float_value())
+ elif output.get_data_type() == TSDataType.DOUBLE:
+ output.set_double_value(field.get_double_value())
+ elif output.get_data_type() == TSDataType.TEXT:
+ output.set_binary_value(field.get_binary_value())
+ else:
+ raise Exception("unsupported data type {}".format(output.get_data_type()))
+ return output
+
+ def get_data_type(self):
+ return self.__data_type
+
+ def is_null(self):
+ return self.__data_type is None
+
+ def set_bool_value(self, value):
+ self.__bool_value = value
+
+ def get_bool_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__bool_value
+
+ def set_int_value(self, value):
+ self.__int_value = value
+
+ def get_int_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__int_value
+
+ def set_long_value(self, value):
+ self.__long_value = value
+
+ def get_long_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__long_value
+
+ def set_float_value(self, value):
+ self.__float_value = value
+
+ def get_float_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__float_value
+
+ def set_double_value(self, value):
+ self.__double_value = value
+
+ def get_double_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__double_value
+
+ def set_binary_value(self, value):
+ self.__binary_value = value
+
+ def get_binary_value(self):
+ if self.__data_type is None:
+ raise Exception("Null Field Exception!")
+ return self.__binary_value
+
+ def get_string_value(self):
+ if self.__data_type is None:
+ return "None"
+ elif self.__data_type == TSDataType.BOOLEAN:
+ return str(self.__bool_value)
+ elif self.__data_type == TSDataType.INT64:
+ return str(self.__long_value)
+ elif self.__data_type == TSDataType.INT32:
+ return str(self.__int_value)
+ elif self.__data_type == TSDataType.FLOAT:
+ return str(self.__float_value)
+ elif self.__data_type == TSDataType.DOUBLE:
+ return str(self.__double_value)
+ elif self.__data_type == TSDataType.TEXT:
+ return self.__binary_value.decode('utf-8')
+ else:
+ raise Exception("unsupported data type {}".format(self.__data_type))
+
+ def __str__(self):
+ return self.get_string_value()
+
+ def get_object_value(self, data_type):
+ """
+ :param data_type: TSDataType
+ """
+ if self.__data_type is None:
+ return None
+ elif data_type == TSDataType.BOOLEAN:
+ return self.get_bool_value()
+ elif data_type == TSDataType.INT32:
+ return self.get_int_value()
+ elif data_type == TSDataType.INT64:
+ return self.get_long_value()
+ elif data_type == TSDataType.FLOAT:
+ return self.get_float_value()
+ elif data_type == TSDataType.DOUBLE:
+ return self.get_double_value()
+ elif data_type == TSDataType.TEXT:
+ return self.get_binary_value()
+ else:
+ raise Exception("unsupported data type {}".format(data_type))
+
+ @staticmethod
+ def get_field(value, data_type):
+ """
+ :param value: field value corresponding to the data type
+ :param data_type: TSDataType
+ """
+ if value is None:
+ return None
+ field = Field(data_type)
+ if data_type == TSDataType.BOOLEAN:
+ field.set_bool_value(value)
+ elif data_type == TSDataType.INT32:
+ field.set_int_value(value)
+ elif data_type == TSDataType.INT64:
+ field.set_long_value(value)
+ elif data_type == TSDataType.FLOAT:
+ field.set_float_value(value)
+ elif data_type == TSDataType.DOUBLE:
+ field.set_double_value(value)
+ elif data_type == TSDataType.TEXT:
+ field.set_binary_value(value)
+ else:
+ raise Exception("unsupported data type {}".format(data_type))
+ return field
+
diff --git a/client-py/src/utils/IoTDBConstants.py b/client-py/src/utils/IoTDBConstants.py
new file mode 100644
index 0000000..7e31262
--- /dev/null
+++ b/client-py/src/utils/IoTDBConstants.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from enum import Enum, unique
+
+
+@unique
+class TSDataType(Enum):
+ BOOLEAN = 0
+ INT32 = 1
+ INT64 = 2
+ FLOAT = 3
+ DOUBLE = 4
+ TEXT = 5
+
+
+@unique
+class TSEncoding(Enum):
+ PLAIN = 0
+ PLAIN_DICTIONARY = 1
+ RLE = 2
+ DIFF = 3
+ TS_2DIFF = 4
+ BITMAP = 5
+ GORILLA = 6
+ REGULAR = 7
+
+
+@unique
+class Compressor(Enum):
+ UNCOMPRESSED = 0
+ SNAPPY = 1
+ GZIP = 2
+ LZO = 3
+ SDT = 4
+ PAA = 5
+ PLA = 6
diff --git a/client-py/src/utils/IoTDBRpcDataSet.py b/client-py/src/utils/IoTDBRpcDataSet.py
new file mode 100644
index 0000000..f712c98
--- /dev/null
+++ b/client-py/src/utils/IoTDBRpcDataSet.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import *
+
+from thrift.transport import TTransport
+from iotdb.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq
+
+
+class IoTDBRpcDataSet(object):
+ TIMESTAMP_STR = "Time"
+ # VALUE_IS_NULL = "The value got by %s (column name) is NULL."
+ START_INDEX = 2
+ FLAG = 0x80
+
+ def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
+ client, session_id, query_data_set, fetch_size):
+ self.__session_id = session_id
+ self.__ignore_timestamp = ignore_timestamp
+ self.__sql = sql
+ self.__query_id = query_id
+ self.__client = client
+ self.__fetch_size = fetch_size
+ self.__column_size = len(column_name_list)
+
+ self.__column_name_list = []
+ self.__column_type_list = []
+ self.__column_ordinal_dict = {}
+ if not ignore_timestamp:
+ self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR)
+ self.__column_type_list.append(TSDataType.INT64)
+ self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
+
+ if column_name_index is not None:
+ self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))]
+ for i in range(len(column_name_list)):
+ name = column_name_list[i]
+ self.__column_name_list.append(name)
+ self.__column_type_list.append(TSDataType[column_type_list[i]])
+ if name not in self.__column_ordinal_dict:
+ index = column_name_index[name]
+ self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX
+ self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]]
+ else:
+ index = IoTDBRpcDataSet.START_INDEX
+ self.__column_type_deduplicated_list = []
+ for i in range(len(column_name_list)):
+ name = column_name_list[i]
+ self.__column_name_list.append(name)
+ self.__column_type_list.append(TSDataType[column_type_list[i]])
+ if name not in self.__column_ordinal_dict:
+ self.__column_ordinal_dict[name] = index
+ index += 1
+ self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]])
+
+ self.__time_bytes = bytes(0)
+ self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))]
+ self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))]
+ self.__query_data_set = query_data_set
+ self.__is_closed = False
+ self.__empty_resultSet = False
+ self.__has_cached_record = False
+ self.__rows_index = 0
+
+ def close(self):
+ if self.__is_closed:
+ return
+ if self.__client is not None:
+ try:
+ status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id))
+ print("close session {}, message: {}".format(self.__session_id, status.message))
+ except TTransport.TException as e:
+ print("close session {} failed because: ".format(self.__session_id), e)
+ raise Exception
+
+ self.__is_closed = True
+ self.__client = None
+
+ def next(self):
+ if self.has_cached_result():
+ self.construct_one_row()
+ return True
+ if self.__empty_resultSet:
+ return False
+ if self.fetch_results():
+ self.construct_one_row()
+ return True
+ return False
+
+ def has_cached_result(self):
+ return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0)
+
+ def construct_one_row(self):
+ # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
+ self.__time_bytes = self.__query_data_set.time[:8]
+ self.__query_data_set.time = self.__query_data_set.time[8:]
+ for i in range(len(self.__query_data_set.bitmapList)):
+ bitmap_buffer = self.__query_data_set.bitmapList[i]
+
+ # another 8 new rows, should move the bitmap buffer position to next byte
+ if self.__rows_index % 8 == 0:
+ self.__current_bitmap[i] = bitmap_buffer[0]
+ self.__query_data_set.bitmapList[i] = bitmap_buffer[1:]
+ if not self.is_null(i, self.__rows_index):
+ value_buffer = self.__query_data_set.valueList[i]
+ data_type = self.__column_type_deduplicated_list[i]
+
+ # simulating buffer
+ if data_type == TSDataType.BOOLEAN:
+ self.__value[i] = value_buffer[:1]
+ self.__query_data_set.valueList[i] = value_buffer[1:]
+ elif data_type == TSDataType.INT32:
+ self.__value[i] = value_buffer[:4]
+ self.__query_data_set.valueList[i] = value_buffer[4:]
+ elif data_type == TSDataType.INT64:
+ self.__value[i] = value_buffer[:8]
+ self.__query_data_set.valueList[i] = value_buffer[8:]
+ elif data_type == TSDataType.FLOAT:
+ self.__value[i] = value_buffer[:4]
+ self.__query_data_set.valueList[i] = value_buffer[4:]
+ elif data_type == TSDataType.DOUBLE:
+ self.__value[i] = value_buffer[:8]
+ self.__query_data_set.valueList[i] = value_buffer[8:]
+ elif data_type == TSDataType.TEXT:
+ length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False)
+ self.__value[i] = value_buffer[4: 4 + length]
+ self.__query_data_set.valueList[i] = value_buffer[4 + length:]
+ else:
+ print("unsupported data type {}.".format(data_type))
+ # could raise exception here
+ self.__rows_index += 1
+ self.__has_cached_record = True
+
+ def fetch_results(self):
+ self.__rows_index = 0
+ request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
+ try:
+ resp = self.__client.fetchResults(request)
+ if not resp.hasResultSet:
+ self.__empty_resultSet = True
+ else:
+ self.__query_data_set = resp.queryDataSet
+ return resp.hasResultSet
+ except TTransport.TException as e:
+ print("Cannot fetch result from server, because of network connection: ", e)
+
+ def is_null(self, index, row_num):
+ bitmap = self.__current_bitmap[index]
+ shift = row_num % 8
+ return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0
+
+ def is_null_by_index(self, column_index):
+ index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX
+ # time column will never be None
+ if index < 0:
+ return True
+ return self.is_null(index, self.__rows_index - 1)
+
+ def is_null_by_name(self, column_name):
+ index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX
+ # time column will never be None
+ if index < 0:
+ return True
+ return self.is_null(index, self.__rows_index - 1)
+
+ def find_column_name_by_index(self, column_index):
+ if column_index <= 0:
+ raise Exception("Column index should start from 1")
+ if column_index > len(self.__column_name_list):
+ raise Exception("column index {} out of range {}".format(column_index, self.__column_size))
+ return self.__column_name_list[column_index - 1]
+
+ def get_fetch_size(self):
+ return self.__fetch_size
+
+ def set_fetch_size(self, fetch_size):
+ self.__fetch_size = fetch_size
+
+ def get_column_names(self):
+ return self.__column_name_list
+
+ def get_column_types(self):
+ return self.__column_type_list
+
+ def get_column_size(self):
+ return self.__column_size
+
+ def get_ignore_timestamp(self):
+ return self.__ignore_timestamp
+
+ def get_column_ordinal_dict(self):
+ return self.__column_ordinal_dict
+
+ def get_column_type_deduplicated_list(self):
+ return self.__column_type_deduplicated_list
+
+ def get_values(self):
+ return self.__value
+
+ def get_time_bytes(self):
+ return self.__time_bytes
+
+ def get_has_cached_record(self):
+ return self.__has_cached_record
diff --git a/client-py/src/utils/RowRecord.py b/client-py/src/utils/RowRecord.py
new file mode 100644
index 0000000..78e0362
--- /dev/null
+++ b/client-py/src/utils/RowRecord.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+from Field import Field
+
+
+class RowRecord(object):
+
+ def __init__(self, timestamp, field_list=None):
+ self.__timestamp = timestamp
+ self.__field_list = field_list
+
+ def add_field(self, field):
+ self.__field_list.append(field)
+
+ def add_field(self, value, data_type):
+ self.__field_list.append(Field.get_field(value, data_type))
+
+ def __str__(self):
+ str_list = [str(self.__timestamp)]
+ for field in self.__field_list:
+ str_list.append("\t\t")
+ str_list.append(str(field))
+ return "".join(str_list)
+
+ def get_timestamp(self):
+ return self.__timestamp
+
+ def set_timestamp(self, timestamp):
+ self.__timestamp = timestamp
+
+ def get_fields(self):
+ return self.__field_list
+
+ def set_fields(self, field_list):
+ self.__field_list = field_list
+
+ def set_field(self, index, field):
+ self.__field_list[index] = field
diff --git a/client-py/src/utils/SessionDataSet.py b/client-py/src/utils/SessionDataSet.py
new file mode 100644
index 0000000..92662cc
--- /dev/null
+++ b/client-py/src/utils/SessionDataSet.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+from IoTDBRpcDataSet import IoTDBRpcDataSet
+from Field import Field
+from RowRecord import RowRecord
+import struct
+
+
+class SessionDataSet(object):
+
+ def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id,
+ query_data_set, ignore_timestamp):
+ self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index,
+ ignore_timestamp, query_id, client, session_id, query_data_set, 1024)
+
+ def get_fetch_size(self):
+ return self.iotdb_rpc_data_set.get_fetch_size()
+
+ def set_fetch_size(self, fetch_size):
+ self.iotdb_rpc_data_set.set_fetch_size(fetch_size)
+
+ def get_column_names(self):
+ return self.iotdb_rpc_data_set.get_column_names()
+
+ def get_column_types(self):
+ return self.iotdb_rpc_data_set.get_column_types()
+
+ def has_next(self):
+ return self.iotdb_rpc_data_set.next()
+
+ def next(self):
+ if not self.iotdb_rpc_data_set.get_has_cached_record():
+ if not self.has_next():
+ return None
+ self.iotdb_rpc_data_set.has_cached_record = False
+ return self.construct_row_record_from_value_array()
+
+ def construct_row_record_from_value_array(self):
+ out_fields = []
+ for i in range(self.iotdb_rpc_data_set.get_column_size()):
+ index = i + 1
+ data_set_column_index = i + IoTDBRpcDataSet.START_INDEX
+ if self.iotdb_rpc_data_set.get_ignore_timestamp():
+ index -= 1
+ data_set_column_index -= 1
+ column_name = self.iotdb_rpc_data_set.get_column_names()[index]
+ location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX
+
+ if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
+ value_bytes = self.iotdb_rpc_data_set.get_values()[location]
+ data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location]
+ field = Field(data_type)
+ if data_type == TSDataType.BOOLEAN:
+ value = struct.unpack(">?", value_bytes)[0]
+ field.set_bool_value(value)
+ elif data_type == TSDataType.INT32:
+ value = struct.unpack(">i", value_bytes)[0]
+ field.set_int_value(value)
+ elif data_type == TSDataType.INT64:
+ value = struct.unpack(">q", value_bytes)[0]
+ field.set_long_value(value)
+ elif data_type == TSDataType.FLOAT:
+ value = struct.unpack(">f", value_bytes)[0]
+ field.set_float_value(value)
+ elif data_type == TSDataType.DOUBLE:
+ value = struct.unpack(">d", value_bytes)[0]
+ field.set_double_value(value)
+ elif data_type == TSDataType.TEXT:
+ field.set_binary_value(value_bytes)
+ else:
+ print("unsupported data type {}.".format(data_type))
+ # could raise exception here
+ else:
+ field = Field(None)
+ out_fields.append(field)
+
+ return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields)
+
+ def close_operation_handle(self):
+ self.iotdb_rpc_data_set.close()
+
+
+
+
diff --git a/client-py/src/utils/Tablet.py b/client-py/src/utils/Tablet.py
new file mode 100644
index 0000000..51798a4
--- /dev/null
+++ b/client-py/src/utils/Tablet.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import *
+import struct
+
+
+class Tablet(object):
+
+ def __init__(self, device_id, measurements, data_types, values, timestamps):
+ """
+ creating a tablet for insertion
+ for example, considering device: root.sg1.d1
+ timestamps, m1, m2, m3
+ 1, 125.3, True, text1
+ 2, 111.6, False, text2
+ 3, 688.6, True, text3
+ Notice: The tablet should not have empty cell
+ The tablet will be sorted at the initialization by timestamps
+
+ :param device_id: String, IoTDB time series path to device layer (without sensor).
+ :param measurements: List, sensors.
+ :param data_types: TSDataType List, specify value types for sensors.
+ :param values: 2-D List, the values of each row should be the outer list element.
+ :param timestamps: List.
+ """
+ if len(timestamps) != len(values):
+ print("Input error! len(timestamps) does not equal to len(values)!")
+ # could raise an error here.
+
+ if not Tablet.check_sorted(timestamps):
+ sorted_zipped = sorted(zip(timestamps, values))
+ result = zip(*sorted_zipped)
+ self.__timestamps, self.__values = [list(x) for x in result]
+ else:
+ self.__values = values
+ self.__timestamps = timestamps
+
+ self.__device_id = device_id
+ self.__measurements = measurements
+ self.__data_types = data_types
+ self.__row_number = len(timestamps)
+ self.__column_number = len(measurements)
+
+ @staticmethod
+ def check_sorted(timestamps):
+ for i in range(1, len(timestamps)):
+ if timestamps[i] < timestamps[i - 1]:
+ return False
+ return True
+
+ def get_measurements(self):
+ return self.__measurements
+
+ def get_data_types(self):
+ return self.__data_types
+
+ def get_row_number(self):
+ return self.__row_number
+
+ def get_device_id(self):
+ return self.__device_id
+
+ def get_binary_timestamps(self):
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for timestamp in self.__timestamps:
+ format_str_list.append("q")
+ values_tobe_packed.append(timestamp)
+
+ format_str = ''.join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
+
+ def get_binary_values(self):
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for i in range(self.__column_number):
+ if self.__data_types[i] == TSDataType.BOOLEAN:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("?")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.INT32:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("i")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.INT64:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("q")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.FLOAT:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("f")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.DOUBLE:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("d")
+ for j in range(self.__row_number):
+ values_tobe_packed.append(self.__values[j][i])
+ elif self.__data_types[i] == TSDataType.TEXT:
+ for j in range(self.__row_number):
+ value_bytes = bytes(self.__values[j][i], 'utf-8')
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ else:
+ print("Unsupported data type:" + str(self.__data_types[i]))
+ # could raise an error here.
+ return
+
+ format_str = ''.join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
+
diff --git a/client-py/src/utils/__init__.py b/client-py/src/utils/__init__.py
new file mode 100644
index 0000000..a4797b6
--- /dev/null
+++ b/client-py/src/utils/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+