You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/01/10 06:21:37 UTC

[iotdb] branch IOTDB-1801 created (now 3a4ebbe)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch IOTDB-1801
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 3a4ebbe  [IOTDB-1801] Python APIs for aligned timeseries

This branch includes the following new commits:

     new 3a4ebbe  [IOTDB-1801] Python APIs for aligned timeseries

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-1801] Python APIs for aligned timeseries

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch IOTDB-1801
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a4ebbe375f1eda3472c50a37ac2f41eedf10b5c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Jan 10 14:20:22 2022 +0800

    [IOTDB-1801] Python APIs for aligned timeseries
---
 client-py/SessionAlignedTimeseriesExample.py | 197 +++++++++++++++++++
 client-py/SessionAlignedTimeseriesTest.py    | 280 +++++++++++++++++++++++++++
 client-py/iotdb/Session.py                   | 209 +++++++++++++++++++-
 3 files changed, 676 insertions(+), 10 deletions(-)

diff --git a/client-py/SessionAlignedTimeseriesExample.py b/client-py/SessionAlignedTimeseriesExample.py
new file mode 100644
index 0000000..a54b169
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesExample.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting aligned time series.
+measurements_lst_ = [
+    "s_01",
+    "s_02",
+    "s_03",
+]
+data_type_lst_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+    "s_04",
+    "s_05",
+    "s_06",
+    "s_07",
+    "s_08",
+    "s_09",
+]
+data_type_lst_ = [
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+session.delete_time_series(
+    [
+        "root.sg_test_01.d_02.s_07",
+        "root.sg_test_01.d_02.s_08",
+        "root.sg_test_01.d_02.s_09",
+    ]
+)
+
+# checking time series
+print(
+    "s_07 expecting False, checking result: ",
+    session.check_time_series_exists("root.sg_test_01.d_02.s_07"),
+)
+print(
+    "s_03 expecting True, checking result: ",
+    session.check_time_series_exists("root.sg_test_01.d_02.s_03"),
+)
+
+# insert one aligned record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+session.insert_aligned_record("root.sg_test_01.d_02", 1, measurements_, data_types_, values_)
+
+# insert multiple aligned records into database
+measurements_list_ = [
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+    [False, 22, 33, 4.4, 55.1, "test_records01"],
+    [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+session.insert_aligned_records(
+    device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+)
+
+# insert one aligned tablet into the database.
+values_ = [
+    [False, 10, 11, 1.1, 10011.1, "test01"],
+    [True, 100, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, 188.1, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, "test04"],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert multiple aligned tablets into database
+tablet_01 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+session.insert_aligned_tablets([tablet_01, tablet_02])
+
+# insert one aligned tablet with empty cells into the database.
+values_ = [
+    [None, 10, 11, 1.1, 10011.1, "test01"],
+    [True, None, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, None, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, None],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [16, 17, 18, 19]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+session.insert_aligned_tablet(tablet_)
+
+# insert aligned records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_aligned_records_of_one_device(
+    "root.sg_test_01.d_02", time_list, measurements_list, data_types_list, values_list
+)
+
+# execute non-query sql statement
+session.execute_non_query_statement(
+    "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+)
+
+# execute sql query statement
+with session.execute_query_statement(
+    "select * from root.sg_test_01.d_02"
+) as session_data_set:
+    session_data_set.set_fetch_size(1024)
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/SessionAlignedTimeseriesTest.py b/client-py/SessionAlignedTimeseriesTest.py
new file mode 100644
index 0000000..fbd36df
--- /dev/null
+++ b/client-py/SessionAlignedTimeseriesTest.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail():
+    global failed_count
+    global final_flag
+    final_flag = False
+    failed_count += 1
+
+
+def print_message(message):
+    print("*********")
+    print(message)
+    print("*********")
+
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
+session.open(False)
+
+if not session.is_open():
+    print("can't open session")
+    exit(1)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+
+if session.delete_storage_group("root.sg_test_02") < 0:
+    test_fail()
+    print_message("delete storage group failed")
+
+if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
+    test_fail()
+    print_message("delete storage groups failed")
+
+# setting aligned time series.
+measurements_lst_ = [
+    "s_01",
+    "s_02",
+    "s_03",
+]
+data_type_lst_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# setting more aligned time series once.
+measurements_lst_ = [
+    "s_04",
+    "s_05",
+    "s_06",
+    "s_07",
+    "s_08",
+    "s_09",
+]
+data_type_lst_ = [
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_aligned_time_series(
+    "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+if (
+    session.delete_time_series(
+        [
+            "root.sg_test_01.d_02.s_07",
+            "root.sg_test_01.d_02.s_08",
+            "root.sg_test_01.d_02.s_09",
+        ]
+    )
+    < 0
+):
+    test_fail()
+    print_message("delete time series failed")
+
+# checking time series
+# s_07 expecting False
+if session.check_time_series_exists("root.sg_test_01.d_02.s_07"):
+    test_fail()
+    print_message("root.sg_test_01.d_02.s_07 shouldn't exist")
+
+# s_03 expecting True
+if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"):
+    test_fail()
+    print_message("root.sg_test_01.d_02.s_03 should exist")
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+]
+if (
+    session.insert_aligned_record(
+        "root.sg_test_01.d_02", 1, measurements_, data_types_, values_
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert record failed")
+
+# insert multiple records into database
+measurements_list_ = [
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+    [False, 22, 33, 4.4, 55.1, "test_records01"],
+    [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
+if (
+    session.insert_aligned_records(
+        device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert records failed")
+
+# insert one tablet into the database.
+values_ = [
+    [False, 10, 11, 1.1, 10011.1, "test01"],
+    [True, 100, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, 188.1, 688.25, "test03"],
+    [True, 0, 0, 0, 6.25, "test04"],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+    test_fail()
+    print_message("insert tablet failed")
+
+# insert multiple tablets into database
+tablet_01 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
+)
+if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0:
+    test_fail()
+    print_message("insert tablets failed")
+
+# insert one tablet with empty cells into the database.
+values_ = [
+    [None, 10, 11, 1.1, 10011.1, "test01"],
+    [True, None, 11111, 1.25, 101.0, "test02"],
+    [False, 100, 1, None, 688.25, "test03"],
+    [True, 0, 0, 0, None, None],
+]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [20, 21, 22, 23]
+tablet_ = Tablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_aligned_tablet(tablet_) < 0:
+    test_fail()
+    print_message("insert tablet with empty cells failed")
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+if (
+    session.insert_aligned_records_of_one_device(
+        "root.sg_test_01.d_02",
+        time_list,
+        measurements_list,
+        data_types_list,
+        values_list,
+    )
+    < 0
+):
+    test_fail()
+    print_message("insert records of one device failed")
+
+# execute non-query sql statement
+if (
+    session.execute_non_query_statement(
+        "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
+    )
+    < 0
+):
+    test_fail()
+    print_message(
+        "execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed"
+    )
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02")
+session_data_set.set_fetch_size(1024)
+expect_count = 20
+actual_count = 0
+while session_data_set.has_next():
+    print(session_data_set.next())
+    actual_count += 1
+session_data_set.close_operation_handle()
+
+if actual_count != expect_count:
+    test_fail()
+    print_message(
+        "query count mismatch: expect count: "
+        + str(expect_count)
+        + " actual count: "
+        + str(actual_count)
+    )
+
+# close session connection.
+session.close()
+
+if final_flag:
+    print("All executions done!!")
+else:
+    print("Some test failed, please have a check")
+    print("failed count: ", failed_count)
+    exit(1)
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index ccd0fdd..4a825a3 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -38,7 +38,7 @@ from .thrift.rpc.TSIService import (
     TSInsertRecordsReq,
     TSInsertRecordsOfOneDeviceReq,
 )
-from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
+from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq, TSCreateAlignedTimeseriesReq
 
 # for debug
 # from IoTDBConstants import *
@@ -211,6 +211,33 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def create_aligned_time_series(
+            self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+    ):
+        """
+        create aligned time series
+        :param device_id: String, device id for timeseries (starts from root)
+        :param measurements_lst: List of String, measurement ids for time series
+        :param data_type_lst: List of TSDataType, data types for time series
+        :param encoding_lst: List of TSEncoding, encodings for time series
+        :param compressor_lst: List of Compressor, compressing types for time series
+        """
+        data_type_lst = [data_type.value for data_type in data_type_lst]
+        encoding_lst = [encoding.value for encoding in encoding_lst]
+        compressor_lst = [compressor.value for compressor in compressor_lst]
+
+        request = TSCreateAlignedTimeseriesReq(
+            self.__session_id, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+        )
+        status = self.__client.createAlignedTimeseries(request)
+        logger.debug(
+            "creating aligned time series of device {} message: {}".format(
+                measurements_lst, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
     def create_multi_time_series(
         self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst
     ):
@@ -296,6 +323,25 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_str_record(self, device_id, timestamp, measurements, string_values):
+        """ special case for inserting one row of String (TEXT) value """
+        if type(string_values) == str:
+            string_values = [string_values]
+        if type(measurements) == str:
+            measurements = [measurements]
+        data_types = [TSDataType.TEXT.value for _ in string_values]
+        request = self.gen_insert_str_record_req(
+            device_id, timestamp, measurements, data_types, string_values, True
+        )
+        status = self.__client.insertStringRecord(request)
+        logger.debug(
+            "insert one record to device {} message: {}".format(
+                device_id, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
     def insert_record(self, device_id, timestamp, measurements, data_types, values):
         """
         insert one row of record into database, if you want improve your performance, please use insertTablet method
@@ -348,6 +394,58 @@ class Session(object):
         )
 
         return Session.verify_success(status)
+    def insert_aligned_record(self, device_id, timestamp, measurements, data_types, values):
+        """
+        insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
+            for example a record at time=10086 with three measurements is:
+                timestamp,     m1,    m2,     m3
+                    10086,  125.3,  True,  text1
+        :param device_id: String, time series path for device
+        :param timestamp: Integer, indicate the timestamp of the row of data
+        :param measurements: List of String, sensor names
+        :param data_types: List of TSDataType, indicate the data type for each sensor
+        :param values: List, values to be inserted, for each sensor
+        """
+        data_types = [data_type.value for data_type in data_types]
+        request = self.gen_insert_record_req(
+            device_id, timestamp, measurements, data_types, values, True
+        )
+        status = self.__client.insertRecord(request)
+        logger.debug(
+            "insert one record to device {} message: {}".format(
+                device_id, status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
+    def insert_aligned_records(
+        self, device_ids, times, measurements_lst, types_lst, values_lst
+    ):
+        """
+        insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
+        between those records
+        :param device_ids: List of String, time series paths for device
+        :param times: List of Integer, timestamps for records
+        :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+        :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+        :param values_lst: 2-D List, values to be inserted, for each device
+        """
+        type_values_lst = []
+        for types in types_lst:
+            data_types = [data_type.value for data_type in types]
+            type_values_lst.append(data_types)
+        request = self.gen_insert_records_req(
+            device_ids, times, measurements_lst, type_values_lst, values_lst, True
+        )
+        status = self.__client.insertRecords(request)
+        logger.debug(
+            "insert multiple records to devices {} message: {}".format(
+                device_ids, status.message
+            )
+        )
+
+        return Session.verify_success(status)
 
     def test_insert_record(
         self, device_id, timestamp, measurements, data_types, values
@@ -401,7 +499,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_record_req(
-        self, device_id, timestamp, measurements, data_types, values
+        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -409,22 +507,22 @@ class Session(object):
             )
         values_in_bytes = Session.value_to_bytes(data_types, values)
         return TSInsertRecordReq(
-            self.__session_id, device_id, measurements, values_in_bytes, timestamp
+            self.__session_id, device_id, measurements, values_in_bytes, timestamp, is_aligned
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values
+        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
                 "length of data types does not equal to length of values!"
             )
         return TSInsertStringRecordReq(
-            self.__session_id, device_id, measurements, values, timestamp
+            self.__session_id, device_id, measurements, values, timestamp, is_aligned
         )
 
     def gen_insert_records_req(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+        self, device_ids, times, measurements_lst, types_lst, values_lst, is_aligned=False
     ):
         if (
             (len(device_ids) != len(measurements_lst))
@@ -448,7 +546,7 @@ class Session(object):
             value_lst.append(values_in_bytes)
 
         return TSInsertRecordsReq(
-            self.__session_id, device_ids, measurements_lst, value_lst, times
+            self.__session_id, device_ids, measurements_lst, value_lst, times, is_aligned
         )
 
     def insert_tablet(self, tablet):
@@ -482,6 +580,37 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_tablet(self, tablet):
+        """
+        insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same
+            for example three records in the same device can form a tablet:
+                timestamps,     m1,    m2,     m3
+                         1,  125.3,  True,  text1
+                         2,  111.6, False,  text2
+                         3,  688.6,  True,  text3
+        Notice: From 0.13.0, the tablet can contain empty cell
+                The tablet itself is sorted (see docs of Tablet.py)
+        :param tablet: a tablet specified above
+        """
+        status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True))
+        logger.debug(
+            "insert one tablet to device {} message: {}".format(
+                tablet.get_device_id(), status.message
+            )
+        )
+
+        return Session.verify_success(status)
+
+    def insert_aligned_tablets(self, tablet_lst):
+        """
+        insert multiple aligned tablets, tablets are independent to each other
+        :param tablet_lst: List of tablets
+        """
+        status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst, True))
+        logger.debug("insert multiple tablets, message: {}".format(status.message))
+
+        return Session.verify_success(status)
+
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
     ):
@@ -540,8 +669,65 @@ class Session(object):
 
         return Session.verify_success(status)
 
+    def insert_aligned_records_of_one_device(
+        self, device_id, times_list, measurements_list, types_list, values_list
+    ):
+        # sort by timestamp
+        sorted_zipped = sorted(
+            zip(times_list, measurements_list, types_list, values_list)
+        )
+        result = zip(*sorted_zipped)
+        times_list, measurements_list, types_list, values_list = [
+            list(x) for x in result
+        ]
+
+        return self.insert_aligned_records_of_one_device_sorted(
+            device_id, times_list, measurements_list, types_list, values_list
+        )
+
+    def insert_aligned_records_of_one_device_sorted(
+        self, device_id, times_list, measurements_list, types_list, values_list
+    ):
+        """
+        Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
+        executeBatch, we pack some insert request in batch and send them to server. If you want to improve
+        your performance, please see insertTablet method
+
+        :param device_id: device id
+        :param times_list: timestamps list
+        :param measurements_list: measurements list
+        :param types_list: types list
+        :param values_list: values list
+        """
+        # check parameter
+        size = len(times_list)
+        if (
+            size != len(measurements_list)
+            or size != len(types_list)
+            or size != len(values_list)
+        ):
+            raise RuntimeError(
+                "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
+            )
+
+        # check sorted
+        if not Session.check_sorted(times_list):
+            raise RuntimeError(
+                "insert records of one device error: timestamp not sorted"
+            )
+
+        request = self.gen_insert_records_of_one_device_request(
+            device_id, times_list, measurements_list, values_list, types_list, True
+        )
+
+        # send request
+        status = self.__client.insertRecordsOfOneDevice(request)
+        logger.debug("insert records of one device, message: {}".format(status.message))
+
+        return Session.verify_success(status)
+
     def gen_insert_records_of_one_device_request(
-        self, device_id, times_list, measurements_list, values_list, types_list
+        self, device_id, times_list, measurements_list, values_list, types_list, is_aligned=False
     ):
         binary_value_list = []
         for values, data_types, measurements in zip(
@@ -561,6 +747,7 @@ class Session(object):
             measurements_list,
             binary_value_list,
             times_list,
+            is_aligned
         )
 
     def test_insert_tablet(self, tablet):
@@ -593,7 +780,7 @@ class Session(object):
 
         return Session.verify_success(status)
 
-    def gen_insert_tablet_req(self, tablet):
+    def gen_insert_tablet_req(self, tablet, is_aligned=False):
         data_type_values = [data_type.value for data_type in tablet.get_data_types()]
         return TSInsertTabletReq(
             self.__session_id,
@@ -603,9 +790,10 @@ class Session(object):
             tablet.get_binary_timestamps(),
             data_type_values,
             tablet.get_row_number(),
+            is_aligned,
         )
 
-    def gen_insert_tablets_req(self, tablet_lst):
+    def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
         device_id_lst = []
         measurements_lst = []
         values_lst = []
@@ -630,6 +818,7 @@ class Session(object):
             timestamps_lst,
             type_lst,
             size_lst,
+            is_aligned,
         )
 
     def execute_query_statement(self, sql, timeout=0):