You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/01/10 06:21:38 UTC

[iotdb] 01/01: [IOTDB-1801] Python APIs for aligned timeseries

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch IOTDB-1801
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a4ebbe375f1eda3472c50a37ac2f41eedf10b5c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Jan 10 14:20:22 2022 +0800

    [IOTDB-1801] Python APIs for aligned timeseries
---
 client-py/SessionAlignedTimeseriesExample.py | 197 +++++++++++++++++++
 client-py/SessionAlignedTimeseriesTest.py    | 280 +++++++++++++++++++++++++++
 client-py/iotdb/Session.py                   | 209 +++++++++++++++++++-
 3 files changed, 676 insertions(+), 10 deletions(-)

diff --git a/client-py/SessionAlignedTimeseriesExample.py b/client-py/SessionAlignedTimeseriesExample.py
new file mode 100644
index 0000000..a54b169
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesExample.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting aligned time series.
+measurements_lst_ = [
+    "s_01",
+    "s_02",
+    "s_03",
+]
+data_type_lst_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+    "s_04",
+    "s_05",
+    "s_06",
+    "s_07",
+    "s_08",
+    "s_09",
+]
+data_type_lst_ = [
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+session.delete_time_series(
+    [
+        "root.sg_test_01.d_02.s_07",
+        "root.sg_test_01.d_02.s_08",
+        "root.sg_test_01.d_02.s_09",
+    ]
+)
+
+# checking time series
+print(
+    "s_07 expecting False, checking result: ",
+    session.check_time_series_exists("root.sg_test_01.d_02.s_07"),
+)
+print(
+    "s_03 expecting True, checking result: ",
+    session.check_time_series_exists("root.sg_test_01.d_02.s_03"),
+)
+
+# insert one aligned record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+session.insert_aligned_record("root.sg_test_01.d_02", 1, measurements_, data_types_, values_)
+
+# insert multiple aligned records into database
+measurements_list_ = [
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+    [False, 22, 33, 4.4, 55.1, "test_records01"],
+    [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+session.insert_aligned_records(
+    device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+)
+
+# insert one aligned tablet into the database.
+values_ = [
+    [False, 10, 11, 1.1, 10011.1, "test01"],
+    [True, 100, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, 188.1, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, "test04"],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert multiple aligned tablets into database
+tablet_01 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+session.insert_aligned_tablets([tablet_01, tablet_02])
+
+# insert one aligned tablet with empty cells into the database.
+values_ = [
+    [None, 10, 11, 1.1, 10011.1, "test01"],
+    [True, None, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, None, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, None],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [16, 17, 18, 19]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert aligned records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_aligned_records_of_one_device(
+    "root.sg_test_01.d_02", time_list, measurements_list, data_types_list, values_list
+)
+
+# execute non-query sql statement
+session.execute_non_query_statement(
+    "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+)
+
+# execute sql query statement
+with session.execute_query_statement(
+    "select * from root.sg_test_01.d_02"
+) as session_data_set:
+    session_data_set.set_fetch_size(1024)
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/SessionAlignedTimeseriesTest.py b/client-py/SessionAlignedTimeseriesTest.py
new file mode 100644
index 0000000..fbd36df
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesTest.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail():
+    global failed_count
+    global final_flag
+    final_flag = False
+    failed_count += 1
+
+
+def print_message(message):
+    print("*********")
+    print(message)
+    print("*********")
+
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+if not session.is_open():
+    print("can't open session")
+    exit(1)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+
+if session.delete_storage_group("root.sg_test_02") < 0:
+    test_fail()
+    print_message("delete storage group failed")
+
+if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
+    test_fail()
+    print_message("delete storage groups failed")
+
+# setting aligned time series.
+measurements_lst_ = [
+    "s_01",
+    "s_02",
+    "s_03",
+]
+data_type_lst_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+    "s_04",
+    "s_05",
+    "s_06",
+    "s_07",
+    "s_08",
+    "s_09",
+]
+data_type_lst_ = [
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+if (
+    session.delete_time_series(
+        [
+            "root.sg_test_01.d_02.s_07",
+            "root.sg_test_01.d_02.s_08",
+            "root.sg_test_01.d_02.s_09",
+        ]
+    )
+    < 0
+):
+    test_fail()
+    print_message("delete time series failed")
+
+# checking time series
+# s_07 expecting False
+if session.check_time_series_exists("root.sg_test_01.d_02.s_07"):
+    test_fail()
+    print_message("root.sg_test_01.d_02.s_07 shouldn't exist")
+
+# s_03 expecting True
+if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"):
+    test_fail()
+    print_message("root.sg_test_01.d_02.s_03 should exist")
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+if (
+    session.insert_aligned_record(
+        "root.sg_test_01.d_02", 1, measurements_, data_types_, values_
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert record failed")
+
+# insert multiple records into database
+measurements_list_ = [
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+    [False, 22, 33, 4.4, 55.1, "test_records01"],
+    [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+if (
+    session.insert_aligned_records(
+        device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert records failed")
+
+# insert one tablet into the database.
+values_ = [
+    [False, 10, 11, 1.1, 10011.1, "test01"],
+    [True, 100, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, 188.1, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, "test04"],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+    test_fail()
+    print_message("insert tablet failed")
+
+# insert multiple tablets into database
+tablet_01 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0:
+    test_fail()
+    print_message("insert tablets failed")
+
+# insert one tablet with empty cells into the database.
+values_ = [
+    [None, 10, 11, 1.1, 10011.1, "test01"],
+    [True, None, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, None, 688.25, "test03"],
+    [True, 0, 0, 0, None, None],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [20, 21, 22, 23]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+    test_fail()
+    print_message("insert tablet with empty cells failed")
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+if (
+    session.insert_aligned_records_of_one_device(
+        "root.sg_test_01.d_02",
+        time_list,
+        measurements_list,
+        data_types_list,
+        values_list,
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert records of one device failed")
+
+# execute non-query sql statement
+if (
+    session.execute_non_query_statement(
+        "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+    )
+    < 0
+):
+    test_fail()
+    print_message(
+        "execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed"
+    )
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02")
+session_data_set.set_fetch_size(1024)
+expect_count = 20
+actual_count = 0
+while session_data_set.has_next():
+    print(session_data_set.next())
+    actual_count += 1
+session_data_set.close_operation_handle()
+
+if actual_count != expect_count:
+    test_fail()
+    print_message(
+        "query count mismatch: expect count: "
+        + str(expect_count)
+        + " actual count: "
+        + str(actual_count)
+    )
+
+# close session connection.
+session.close()
+
+if final_flag:
+    print("All executions done!!")
+else:
+    print("Some test failed, please have a check")
+    print("failed count: ", failed_count)
+    exit(1)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index ccd0fdd..4a825a3 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -38,7 +38,7 @@ from .thrift.rpc.TSIService import (
     TSInsertRecordsReq,
     TSInsertRecordsOfOneDeviceReq,
 )
-from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
+from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq, TSCreateAlignedTimeseriesReq
 
 # for debug
 # from IoTDBConstants import *
@@ -211,6 +211,33 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def create_aligned_time_series(
+            self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+    ):
+        """
+        create aligned time series
+        :param device_id: String, device id for timeseries (starts from root)
+        :param measurements_lst: List of String, measurement ids for time series
+        :param data_type_lst: List of TSDataType, data types for time series
+        :param encoding_lst: List of TSEncoding, encodings for time series
+        :param compressor_lst: List of Compressor, compressing types for time series
+        """
+        data_type_lst = [data_type.value for data_type in data_type_lst]
+        encoding_lst = [encoding.value for encoding in encoding_lst]
+        compressor_lst = [compressor.value for compressor in compressor_lst]
+
+        request = TSCreateAlignedTimeseriesReq(
+            self.__session_id, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+        )
+        status = self.__client.createAlignedTimeseries(request)
+        logger.debug(
+            "creating aligned time series of device {} message: {}".format(
+                measurements_lst, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
     def create_multi_time_series(
         self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst
     ):
@@ -296,6 +323,25 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_str_record(self, device_id, timestamp, measurements, string_values):
+        """ special case for inserting one row of String (TEXT) value """
+        if type(string_values) == str:
+            string_values = [string_values]
+        if type(measurements) == str:
+            measurements = [measurements]
+        data_types = [TSDataType.TEXT.value for _ in string_values]
+        request = self.gen_insert_str_record_req(
+            device_id, timestamp, measurements, data_types, string_values, True
+        )
+        status = self.__client.insertStringRecord(request)
+        logger.debug(
+            "insert one record to device {} message: {}".format(
+                device_id, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
         insert one row of record into database, if you want improve your performance, please use insertTablet method
@@ -348,6 +394,58 @@ class Session(object):
         )
 
         return Session.verify_success(status)
+    def insert_aligned_record(self, device_id, timestamp, measurements, data_types, values):
+        """
+        insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
+            for example a record at time=10086 with three measurements is:
+                timestamp,     m1,    m2,     m3
+                    10086,  125.3,  True,  text1
+        :param device_id: String, time series path for device
+        :param timestamp: Integer, indicate the timestamp of the row of data
+        :param measurements: List of String, sensor names
+        :param data_types: List of TSDataType, indicate the data type for each sensor
+        :param values: List, values to be inserted, for each sensor
+        """
+        data_types = [data_type.value for data_type in data_types]
+        request = self.gen_insert_record_req(
+            device_id, timestamp, measurements, data_types, values, True
+        )
+        status = self.__client.insertRecord(request)
+        logger.debug(
+            "insert one record to device {} message: {}".format(
+                device_id, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
+    def insert_aligned_records(
+        self, device_ids, times, measurements_lst, types_lst, values_lst
+    ):
+        """
+        insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
+        between those records
+        :param device_ids: List of String, time series paths for device
+        :param times: List of Integer, timestamps for records
+        :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+        :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+        :param values_lst: 2-D List, values to be inserted, for each device
+        """
+        type_values_lst = []
+        for types in types_lst:
+            data_types = [data_type.value for data_type in types]
+            type_values_lst.append(data_types)
+        request = self.gen_insert_records_req(
+            device_ids, times, measurements_lst, type_values_lst, values_lst, True
+        )
+        status = self.__client.insertRecords(request)
+        logger.debug(
+            "insert multiple records to devices {} message: {}".format(
+                device_ids, status.message
+            )
+        )
+
+        return Session.verify_success(status)
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -401,7 +499,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_record_req(
-        self, device_id, timestamp, measurements, data_types, values
+        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -409,22 +507,22 @@ class Session(object):
             )
         values_in_bytes = Session.value_to_bytes(data_types, values)
         return TSInsertRecordReq(
-            self.__session_id, device_id, measurements, values_in_bytes, timestamp
+            self.__session_id, device_id, measurements, values_in_bytes, timestamp, is_aligned
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values
+        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
                 "length of data types does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
-            self.__session_id, device_id, measurements, values, timestamp
+            self.__session_id, device_id, measurements, values, timestamp, is_aligned
         )
 
     def gen_insert_records_req(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+        self, device_ids, times, measurements_lst, types_lst, values_lst, is_aligned=False
     ):
         if (
             (len(device_ids) != len(measurements_lst))
@@ -448,7 +546,7 @@ class Session(object):
             value_lst.append(values_in_bytes)
 
         return TSInsertRecordsReq(
-            self.__session_id, device_ids, measurements_lst, value_lst, times
+            self.__session_id, device_ids, measurements_lst, value_lst, times, is_aligned
         )
 
     def insert_tablet(self, tablet):
@@ -482,6 +580,37 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_tablet(self, tablet):
+        """
+        insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same
+            for example three records in the same device can form a tablet:
+                timestamps,     m1,    m2,     m3
+                         1,  125.3,  True,  text1
+                         2,  111.6, False,  text2
+                         3,  688.6,  True,  text3
+        Notice: From 0.13.0, the tablet can contain empty cell
+                The tablet itself is sorted (see docs of Tablet.py)
+        :param tablet: a tablet specified above
+        """
+        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
+        logger.debug(
+            "insert one tablet to device {} message: {}".format(
+                tablet.get_device_id(), status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
+    def insert_aligned_tablets(self, tablet_lst):
+        """
+        insert multiple aligned tablets, tablets are independent to each other
+        :param tablet_lst: List of tablets
+        """
+        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst, True))
+        logger.debug("insert multiple tablets, message: {}".format(status.message))
+
+        return Session.verify_success(status)
+
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
     ):
@@ -540,8 +669,65 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_records_of_one_device(
+        self, device_id, times_list, measurements_list, types_list, values_list
+    ):
+        # sort by timestamp
+        sorted_zipped = sorted(
+            zip(times_list, measurements_list, types_list, values_list)
+        )
+        result = zip(*sorted_zipped)
+        times_list, measurements_list, types_list, values_list = [
+            list(x) for x in result
+        ]
+
+        return self.insert_aligned_records_of_one_device_sorted(
+            device_id, times_list, measurements_list, types_list, values_list
+        )
+
+    def insert_aligned_records_of_one_device_sorted(
+        self, device_id, times_list, measurements_list, types_list, values_list
+    ):
+        """
+        Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
+        executeBatch, we pack some insert request in batch and send them to server. If you want to improve
+        your performance, please see insertTablet method
+
+        :param device_id: device id
+        :param times_list: timestamps list
+        :param measurements_list: measurements list
+        :param types_list: types list
+        :param values_list: values list
+        """
+        # check parameter
+        size = len(times_list)
+        if (
+            size != len(measurements_list)
+            or size != len(types_list)
+            or size != len(values_list)
+        ):
+            raise RuntimeError(
+                "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
+            )
+
+        # check sorted
+        if not Session.check_sorted(times_list):
+            raise RuntimeError(
+                "insert records of one device error: timestamp not sorted"
+            )
+
+        request = self.gen_insert_records_of_one_device_request(
+            device_id, times_list, measurements_list, values_list, types_list, True
+        )
+
+        # send request
+        status = self.__client.insertRecordsOfOneDevice(request)
+        logger.debug("insert records of one device, message: {}".format(status.message))
+
+        return Session.verify_success(status)
+
     def gen_insert_records_of_one_device_request(
-        self, device_id, times_list, measurements_list, values_list, types_list
+        self, device_id, times_list, measurements_list, values_list, types_list, is_aligned=False
     ):
         binary_value_list = []
         for values, data_types, measurements in zip(
@@ -561,6 +747,7 @@ class Session(object):
             measurements_list,
             binary_value_list,
             times_list,
+            is_aligned
         )
 
     def test_insert_tablet(self, tablet):
@@ -593,7 +780,7 @@ class Session(object):
 
         return Session.verify_success(status)
 
-    def gen_insert_tablet_req(self, tablet):
+    def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
         return TSInsertTabletReq(
             self.__session_id,
@@ -603,9 +790,10 @@ class Session(object):
             tablet.get_binary_timestamps(),
             data_type_values,
             tablet.get_row_number(),
+            is_aligned,
         )
 
-    def gen_insert_tablets_req(self, tablet_lst):
+    def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
         device_id_lst = []
         measurements_lst = []
         values_lst = []
@@ -630,6 +818,7 @@ class Session(object):
             timestamps_lst,
             type_lst,
             size_lst,
+            is_aligned,
         )
 
     def execute_query_statement(self, sql, timeout=0):