You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/01/10 06:21:38 UTC
[iotdb] 01/01: [IOTDB-1801] Python APIs for aligned timeseries
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch IOTDB-1801
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a4ebbe375f1eda3472c50a37ac2f41eedf10b5c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Jan 10 14:20:22 2022 +0800
[IOTDB-1801] Python APIs for aligned timeseries
---
client-py/SessionAlignedTimeseriesExample.py | 197 +++++++++++++++++++
client-py/SessionAlignedTimeseriesTest.py | 280 +++++++++++++++++++++++++++
client-py/iotdb/Session.py | 209 +++++++++++++++++++-
3 files changed, 676 insertions(+), 10 deletions(-)
diff --git a/client-py/SessionAlignedTimeseriesExample.py b/client-py/SessionAlignedTimeseriesExample.py
new file mode 100644
index 0000000..a54b169
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesExample.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting aligned time series.
+measurements_lst_ = [
+ "s_01",
+ "s_02",
+ "s_03",
+]
+data_type_lst_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+ "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+ "s_04",
+ "s_05",
+ "s_06",
+ "s_07",
+ "s_08",
+ "s_09",
+]
+data_type_lst_ = [
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+ "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+session.delete_time_series(
+ [
+ "root.sg_test_01.d_02.s_07",
+ "root.sg_test_01.d_02.s_08",
+ "root.sg_test_01.d_02.s_09",
+ ]
+)
+
+# checking time series
+print(
+ "s_07 expecting False, checking result: ",
+ session.check_time_series_exists("root.sg_test_01.d_02.s_07"),
+)
+print(
+ "s_03 expecting True, checking result: ",
+ session.check_time_series_exists("root.sg_test_01.d_02.s_03"),
+)
+
+# insert one aligned record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+session.insert_aligned_record("root.sg_test_01.d_02", 1, measurements_, data_types_, values_)
+
+# insert multiple aligned records into database
+measurements_list_ = [
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+session.insert_aligned_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+)
+
+# insert one aligned tablet into the database.
+values_ = [
+ [False, 10, 11, 1.1, 10011.1, "test01"],
+ [True, 100, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, 188.1, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, "test04"],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert multiple aligned tablets into database
+tablet_01 = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+session.insert_aligned_tablets([tablet_01, tablet_02])
+
+# insert one aligned tablet with empty cells into the database.
+values_ = [
+ [None, 10, 11, 1.1, 10011.1, "test01"],
+ [True, None, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, None, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, None],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [16, 17, 18, 19]
+tablet_ = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert aligned records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_aligned_records_of_one_device(
+ "root.sg_test_01.d_02", time_list, measurements_list, data_types_list, values_list
+)
+
+# execute non-query sql statement
+session.execute_non_query_statement(
+ "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+)
+
+# execute sql query statement
+with session.execute_query_statement(
+ "select * from root.sg_test_01.d_02"
+) as session_data_set:
+ session_data_set.set_fetch_size(1024)
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/SessionAlignedTimeseriesTest.py b/client-py/SessionAlignedTimeseriesTest.py
new file mode 100644
index 0000000..fbd36df
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesTest.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail():
+ global failed_count
+ global final_flag
+ final_flag = False
+ failed_count += 1
+
+
+def print_message(message):
+ print("*********")
+ print(message)
+ print("*********")
+
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+if not session.is_open():
+ print("can't open session")
+ exit(1)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+
+if session.delete_storage_group("root.sg_test_02") < 0:
+ test_fail()
+ print_message("delete storage group failed")
+
+if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
+ test_fail()
+ print_message("delete storage groups failed")
+
+# setting aligned time series.
+measurements_lst_ = [
+ "s_01",
+ "s_02",
+ "s_03",
+]
+data_type_lst_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+ "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+ "s_04",
+ "s_05",
+ "s_06",
+ "s_07",
+ "s_08",
+ "s_09",
+]
+data_type_lst_ = [
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+ "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+if (
+ session.delete_time_series(
+ [
+ "root.sg_test_01.d_02.s_07",
+ "root.sg_test_01.d_02.s_08",
+ "root.sg_test_01.d_02.s_09",
+ ]
+ )
+ < 0
+):
+ test_fail()
+ print_message("delete time series failed")
+
+# checking time series
+# s_07 expecting False
+if session.check_time_series_exists("root.sg_test_01.d_02.s_07"):
+ test_fail()
+ print_message("root.sg_test_01.d_02.s_07 shouldn't exist")
+
+# s_03 expecting True
+if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"):
+ test_fail()
+ print_message("root.sg_test_01.d_02.s_03 should exist")
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+if (
+ session.insert_aligned_record(
+ "root.sg_test_01.d_02", 1, measurements_, data_types_, values_
+ )
+ < 0
+):
+ test_fail()
+ print_message("insert record failed")
+
+# insert multiple records into database
+measurements_list_ = [
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+if (
+ session.insert_aligned_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+ )
+ < 0
+):
+ test_fail()
+ print_message("insert records failed")
+
+# insert one tablet into the database.
+values_ = [
+ [False, 10, 11, 1.1, 10011.1, "test01"],
+ [True, 100, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, 188.1, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, "test04"],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+ test_fail()
+ print_message("insert tablet failed")
+
+# insert multiple tablets into database
+tablet_01 = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0:
+ test_fail()
+ print_message("insert tablets failed")
+
+# insert one tablet with empty cells into the database.
+values_ = [
+ [None, 10, 11, 1.1, 10011.1, "test01"],
+ [True, None, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, None, 688.25, "test03"],
+ [True, 0, 0, 0, None, None],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [20, 21, 22, 23]
+tablet_ = Tablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+ test_fail()
+ print_message("insert tablet with empty cells failed")
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+if (
+ session.insert_aligned_records_of_one_device(
+ "root.sg_test_01.d_02",
+ time_list,
+ measurements_list,
+ data_types_list,
+ values_list,
+ )
+ < 0
+):
+ test_fail()
+ print_message("insert records of one device failed")
+
+# execute non-query sql statement
+if (
+ session.execute_non_query_statement(
+ "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+ )
+ < 0
+):
+ test_fail()
+ print_message(
+ "execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed"
+ )
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02")
+session_data_set.set_fetch_size(1024)
+expect_count = 20
+actual_count = 0
+while session_data_set.has_next():
+ print(session_data_set.next())
+ actual_count += 1
+session_data_set.close_operation_handle()
+
+if actual_count != expect_count:
+ test_fail()
+ print_message(
+ "query count mismatch: expect count: "
+ + str(expect_count)
+ + " actual count: "
+ + str(actual_count)
+ )
+
+# close session connection.
+session.close()
+
+if final_flag:
+ print("All executions done!!")
+else:
+ print("Some test failed, please have a check")
+ print("failed count: ", failed_count)
+ exit(1)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index ccd0fdd..4a825a3 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -38,7 +38,7 @@ from .thrift.rpc.TSIService import (
TSInsertRecordsReq,
TSInsertRecordsOfOneDeviceReq,
)
-from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
+from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq, TSCreateAlignedTimeseriesReq
# for debug
# from IoTDBConstants import *
@@ -211,6 +211,33 @@ class Session(object):
return Session.verify_success(status)
+ def create_aligned_time_series(
+ self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+ ):
+ """
+ create aligned time series
+ :param device_id: String, device id for timeseries (starts from root)
+ :param measurements_lst: List of String, measurement ids for time series
+ :param data_type_lst: List of TSDataType, data types for time series
+ :param encoding_lst: List of TSEncoding, encodings for time series
+ :param compressor_lst: List of Compressor, compressing types for time series
+ """
+ data_type_lst = [data_type.value for data_type in data_type_lst]
+ encoding_lst = [encoding.value for encoding in encoding_lst]
+ compressor_lst = [compressor.value for compressor in compressor_lst]
+
+ request = TSCreateAlignedTimeseriesReq(
+ self.__session_id, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+ )
+ status = self.__client.createAlignedTimeseries(request)
+ logger.debug(
+ "creating aligned time series of device {} message: {}".format(
+ measurements_lst, status.message
+ )
+ )
+
+ return Session.verify_success(status)
+
def create_multi_time_series(
self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst
):
@@ -296,6 +323,25 @@ class Session(object):
return Session.verify_success(status)
+ def insert_aligned_str_record(self, device_id, timestamp, measurements, string_values):
+ """ special case for inserting one row of String (TEXT) value """
+ if type(string_values) == str:
+ string_values = [string_values]
+ if type(measurements) == str:
+ measurements = [measurements]
+ data_types = [TSDataType.TEXT.value for _ in string_values]
+ request = self.gen_insert_str_record_req(
+ device_id, timestamp, measurements, data_types, string_values, True
+ )
+ status = self.__client.insertStringRecord(request)
+ logger.debug(
+ "insert one record to device {} message: {}".format(
+ device_id, status.message
+ )
+ )
+
+ return Session.verify_success(status)
+
def insert_record(self, device_id, timestamp, measurements, data_types, values):
"""
insert one row of record into database, if you want improve your performance, please use insertTablet method
@@ -348,6 +394,58 @@ class Session(object):
)
return Session.verify_success(status)
+ def insert_aligned_record(self, device_id, timestamp, measurements, data_types, values):
+ """
+ insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
+ for example a record at time=10086 with three measurements is:
+ timestamp, m1, m2, m3
+ 10086, 125.3, True, text1
+ :param device_id: String, time series path for device
+ :param timestamp: Integer, indicate the timestamp of the row of data
+ :param measurements: List of String, sensor names
+ :param data_types: List of TSDataType, indicate the data type for each sensor
+ :param values: List, values to be inserted, for each sensor
+ """
+ data_types = [data_type.value for data_type in data_types]
+ request = self.gen_insert_record_req(
+ device_id, timestamp, measurements, data_types, values, True
+ )
+ status = self.__client.insertRecord(request)
+ logger.debug(
+ "insert one record to device {} message: {}".format(
+ device_id, status.message
+ )
+ )
+
+ return Session.verify_success(status)
+
+ def insert_aligned_records(
+ self, device_ids, times, measurements_lst, types_lst, values_lst
+ ):
+ """
+ insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
+ between those records
+ :param device_ids: List of String, time series paths for device
+ :param times: List of Integer, timestamps for records
+ :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+ :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+ :param values_lst: 2-D List, values to be inserted, for each device
+ """
+ type_values_lst = []
+ for types in types_lst:
+ data_types = [data_type.value for data_type in types]
+ type_values_lst.append(data_types)
+ request = self.gen_insert_records_req(
+ device_ids, times, measurements_lst, type_values_lst, values_lst, True
+ )
+ status = self.__client.insertRecords(request)
+ logger.debug(
+ "insert multiple records to devices {} message: {}".format(
+ device_ids, status.message
+ )
+ )
+
+ return Session.verify_success(status)
def test_insert_record(
self, device_id, timestamp, measurements, data_types, values
@@ -401,7 +499,7 @@ class Session(object):
return Session.verify_success(status)
def gen_insert_record_req(
- self, device_id, timestamp, measurements, data_types, values
+ self, device_id, timestamp, measurements, data_types, values, is_aligned=False
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
@@ -409,22 +507,22 @@ class Session(object):
)
values_in_bytes = Session.value_to_bytes(data_types, values)
return TSInsertRecordReq(
- self.__session_id, device_id, measurements, values_in_bytes, timestamp
+ self.__session_id, device_id, measurements, values_in_bytes, timestamp, is_aligned
)
def gen_insert_str_record_req(
- self, device_id, timestamp, measurements, data_types, values
+ self, device_id, timestamp, measurements, data_types, values, is_aligned=False
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
"length of data types does not equal to length of values!"
)
return TSInsertStringRecordReq(
- self.__session_id, device_id, measurements, values, timestamp
+ self.__session_id, device_id, measurements, values, timestamp, is_aligned
)
def gen_insert_records_req(
- self, device_ids, times, measurements_lst, types_lst, values_lst
+ self, device_ids, times, measurements_lst, types_lst, values_lst, is_aligned=False
):
if (
(len(device_ids) != len(measurements_lst))
@@ -448,7 +546,7 @@ class Session(object):
value_lst.append(values_in_bytes)
return TSInsertRecordsReq(
- self.__session_id, device_ids, measurements_lst, value_lst, times
+ self.__session_id, device_ids, measurements_lst, value_lst, times, is_aligned
)
def insert_tablet(self, tablet):
@@ -482,6 +580,37 @@ class Session(object):
return Session.verify_success(status)
+ def insert_aligned_tablet(self, tablet):
+ """
+ insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same
+ for example three records in the same device can form a tablet:
+ timestamps, m1, m2, m3
+ 1, 125.3, True, text1
+ 2, 111.6, False, text2
+ 3, 688.6, True, text3
+ Notice: From 0.13.0, the tablet can contain empty cell
+ The tablet itself is sorted (see docs of Tablet.py)
+ :param tablet: a tablet specified above
+ """
+ status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
+ logger.debug(
+ "insert one tablet to device {} message: {}".format(
+ tablet.get_device_id(), status.message
+ )
+ )
+
+ return Session.verify_success(status)
+
+ def insert_aligned_tablets(self, tablet_lst):
+ """
+ insert multiple aligned tablets, tablets are independent to each other
+ :param tablet_lst: List of tablets
+ """
+ status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst, True))
+ logger.debug("insert multiple tablets, message: {}".format(status.message))
+
+ return Session.verify_success(status)
+
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
):
@@ -540,8 +669,65 @@ class Session(object):
return Session.verify_success(status)
+ def insert_aligned_records_of_one_device(
+ self, device_id, times_list, measurements_list, types_list, values_list
+ ):
+ # sort by timestamp
+ sorted_zipped = sorted(
+ zip(times_list, measurements_list, types_list, values_list)
+ )
+ result = zip(*sorted_zipped)
+ times_list, measurements_list, types_list, values_list = [
+ list(x) for x in result
+ ]
+
+ return self.insert_aligned_records_of_one_device_sorted(
+ device_id, times_list, measurements_list, types_list, values_list
+ )
+
+ def insert_aligned_records_of_one_device_sorted(
+ self, device_id, times_list, measurements_list, types_list, values_list
+ ):
+ """
+ Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
+ executeBatch, we pack some insert request in batch and send them to server. If you want to improve
+ your performance, please see insertTablet method
+
+ :param device_id: device id
+ :param times_list: timestamps list
+ :param measurements_list: measurements list
+ :param types_list: types list
+ :param values_list: values list
+ """
+ # check parameter
+ size = len(times_list)
+ if (
+ size != len(measurements_list)
+ or size != len(types_list)
+ or size != len(values_list)
+ ):
+ raise RuntimeError(
+ "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
+ )
+
+ # check sorted
+ if not Session.check_sorted(times_list):
+ raise RuntimeError(
+ "insert records of one device error: timestamp not sorted"
+ )
+
+ request = self.gen_insert_records_of_one_device_request(
+ device_id, times_list, measurements_list, values_list, types_list, True
+ )
+
+ # send request
+ status = self.__client.insertRecordsOfOneDevice(request)
+ logger.debug("insert records of one device, message: {}".format(status.message))
+
+ return Session.verify_success(status)
+
def gen_insert_records_of_one_device_request(
- self, device_id, times_list, measurements_list, values_list, types_list
+ self, device_id, times_list, measurements_list, values_list, types_list, is_aligned=False
):
binary_value_list = []
for values, data_types, measurements in zip(
@@ -561,6 +747,7 @@ class Session(object):
measurements_list,
binary_value_list,
times_list,
+ is_aligned
)
def test_insert_tablet(self, tablet):
@@ -593,7 +780,7 @@ class Session(object):
return Session.verify_success(status)
- def gen_insert_tablet_req(self, tablet):
+ def gen_insert_tablet_req(self, tablet, is_aligned=False):
data_type_values = [data_type.value for data_type in tablet.get_data_types()]
return TSInsertTabletReq(
self.__session_id,
@@ -603,9 +790,10 @@ class Session(object):
tablet.get_binary_timestamps(),
data_type_values,
tablet.get_row_number(),
+ is_aligned,
)
- def gen_insert_tablets_req(self, tablet_lst):
+ def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
device_id_lst = []
measurements_lst = []
values_lst = []
@@ -630,6 +818,7 @@ class Session(object):
timestamps_lst,
type_lst,
size_lst,
+ is_aligned,
)
def execute_query_statement(self, sql, timeout=0):