You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/21 13:09:08 UTC

[incubator-iotdb] 01/04: python session client ver-0.10.0

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 239c7dcb28cd9ce2d74ea0e6ecf270d8fcbde629
Author: Zekun Li <lz...@gmail.com>
AuthorDate: Mon Jul 20 14:16:45 2020 +0800

    python session client ver-0.10.0
---
 client-py/src/Session.py               | 460 +++++++++++++++++++++++++++++++++
 client-py/src/SessionExample.py        | 105 ++++++++
 client-py/src/SessionUT.py             | 107 ++++++++
 client-py/src/utils/Field.py           | 176 +++++++++++++
 client-py/src/utils/IoTDBConstants.py  |  52 ++++
 client-py/src/utils/IoTDBRpcDataSet.py | 219 ++++++++++++++++
 client-py/src/utils/RowRecord.py       |  55 ++++
 client-py/src/utils/SessionDataSet.py  | 101 ++++++++
 client-py/src/utils/Tablet.py          | 133 ++++++++++
 client-py/src/utils/__init__.py        |  18 ++
 10 files changed, 1426 insertions(+)

diff --git a/client-py/src/Session.py b/client-py/src/Session.py
new file mode 100644
index 0000000..c43f274
--- /dev/null
+++ b/client-py/src/Session.py
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import struct
+
+# if you run compile.sh, you can use the following code:
+sys.path.append("../target")
+
+# if you use maven to compile the thrift api, just use the following code:
+# sys.path.append("../../thrift/target/generated-sources-python")
+
+sys.path.append("./utils")
+
+from IoTDBConstants import *
+from SessionDataSet import SessionDataSet
+
+from thrift.protocol import TBinaryProtocol, TCompactProtocol
+from thrift.transport import TSocket, TTransport
+
+from iotdb.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
+     TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \
+     TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq
+
+
+class Session(object):
+    DEFAULT_FETCH_SIZE = 10000
+    DEFAULT_USER = 'root'
+    DEFAULT_PASSWORD = 'root'
+
+    def __init__(self, host, port, user=DEFAULT_USER, password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE):
+        self.__host = host
+        self.__port = port
+        self.__user = user
+        self.__password = password
+        self.__fetch_size = fetch_size
+        self.__is_close = True
+        self.__transport = None
+        self.__client = None
+        self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2
+        self.__session_id = None
+        self.__statement_id = None
+        self.__zone_id = None
+
+    def open(self, enable_rpc_compression):
+        if not self.__is_close:
+            return
+        self.__transport = TTransport.TBufferedTransport(TSocket.TSocket(self.__host, self.__port))
+
+        if not self.__transport.isOpen():
+            try:
+                self.__transport.open()
+            except TTransport.TTransportException as e:
+                print('TTransportException: ', e)
+
+        if enable_rpc_compression:
+            self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport))
+        else:
+            self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
+
+        open_req = TSOpenSessionReq(client_protocol=self.protocol_version,
+                                    username=self.__user,
+                                    password=self.__password)
+
+        try:
+            open_resp = self.__client.openSession(open_req)
+
+            if self.protocol_version != open_resp.serverProtocolVersion:
+                print("Protocol differ, Client version is {}, but Server version is {}".format(
+                    self.protocol_version, open_resp.serverProtocolVersion))
+                # version is less than 0.10
+                if open_resp.serverProtocolVersion == 0:
+                    raise TTransport.TException(message="Protocol not supported.")
+
+            self.__session_id = open_resp.sessionId
+            self.__statement_id = self.__client.requestStatementId(self.__session_id)
+
+        except Exception as e:
+            self.__transport.close()
+            print("session closed because: ", e)
+
+        if self.__zone_id is not None:
+            self.set_time_zone(self.__zone_id)
+        else:
+            self.__zone_id = self.get_time_zone()
+
+        self.__is_close = False
+
+    def close(self):
+        if self.__is_close:
+            return
+        req = TSCloseSessionReq(self.__session_id)
+        try:
+            self.__client.closeSession(req)
+        except TTransport.TException as e:
+            print("Error occurs when closing session at server. Maybe server is down. Error message: ", e)
+        finally:
+            self.__is_close = True
+            if self.__transport is not None:
+                self.__transport.close()
+
+    def set_storage_group(self, group_name):
+        """
+        set one storage group
+        :param group_name: String, storage group name (starts from root)
+        """
+        status = self.__client.setStorageGroup(self.__session_id, group_name)
+        print("setting storage group {} message: {}".format(group_name, status.message))
+
+    def delete_storage_group(self, storage_group):
+        """
+        delete one storage group.
+        :param storage_group: String, path of the target storage group.
+        """
+        groups = [storage_group]
+        self.delete_storage_groups(groups)
+
+    def delete_storage_groups(self, storage_group_lst):
+        """
+        delete multiple storage groups.
+        :param storage_group_lst: List, paths of the target storage groups.
+        """
+        status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst)
+        print("delete storage group(s) {} message: {}".format(storage_group_lst, status.message))
+
+    def create_time_series(self, ts_path, data_type, encoding, compressor):
+        """
+        create single time series
+        :param ts_path: String, complete time series path (starts from root)
+        :param data_type: TSDataType, data type for this time series
+        :param encoding: TSEncoding, encoding for this time series
+        :param compressor: Compressor, compressing type for this time series
+        """
+        data_type = data_type.value
+        encoding = encoding.value
+        compressor = compressor.value
+        request = TSCreateTimeseriesReq(self.__session_id, ts_path, data_type, encoding, compressor)
+        status = self.__client.createTimeseries(request)
+        print("creating time series {} message: {}".format(ts_path, status.message))
+
+    def create_multi_time_series(self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst):
+        """
+        create multiple time series
+        :param ts_path_lst: List of String, complete time series paths (starts from root)
+        :param data_type_lst: List of TSDataType, data types for time series
+        :param encoding_lst: List of TSEncoding, encodings for time series
+        :param compressor_lst: List of Compressor, compressing types for time series
+        """
+        data_type_lst = [data_type.value for data_type in data_type_lst]
+        encoding_lst = [encoding.value for encoding in encoding_lst]
+        compressor_lst = [compressor.value for compressor in compressor_lst]
+
+        request = TSCreateMultiTimeseriesReq(self.__session_id, ts_path_lst, data_type_lst,
+                                             encoding_lst, compressor_lst)
+        resp = self.__client.createMultiTimeseries(request)
+        print("creating multiple time series {} message: {}".format(ts_path_lst, resp.statusList[0].message))
+
+    def delete_time_series(self, paths_list):
+        """
+        delete multiple time series, including data and schema
+        :param paths_list: List of time series path, which should be complete (starts from root)
+        """
+        status = self.__client.deleteTimeseries(self.__session_id, paths_list)
+        print("deleting multiple time series {} message: {}".format(paths_list, status.message))
+
+    def check_time_series_exists(self, path):
+        """
+        check whether a specific time series exists
+        :param path: String, complete path of time series for checking
+        :return Boolean value indicates whether it exists.
+        """
+        data_set = self.execute_query_statement("SHOW TIMESERIES {}".format(path))
+        result = data_set.has_next()
+        data_set.close_operation_handle()
+        return result
+
+    def delete_data(self, paths_list, timestamp):
+        """
+        delete all data <= time in multiple time series
+        :param paths_list: time series list that the data in.
+        :param timestamp: data with time stamp less than or equal to time will be deleted.
+        """
+        request = TSDeleteDataReq(self.__session_id, paths_list, timestamp)
+        try:
+            status = self.__client.deleteData(request)
+            print("delete data from {}, message: {}".format(paths_list, status.message))
+        except TTransport.TException as e:
+            print("data deletion fails because: ", e)
+
+    def insert_str_record(self, device_id, timestamp, measurements, string_values):
+        """ special case for inserting one row of String (TEXT) value """
+        data_types = [TSDataType.TEXT.value for _ in string_values]
+        request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, string_values)
+        status = self.__client.insertRecord(request)
+        print("insert one record to device {} message: {}".format(device_id, status.message))
+
+    def insert_record(self, device_id, timestamp, measurements, data_types, values):
+        """
+        insert one row of record into database, if you want improve your performance, please use insertTablet method
+            for example a record at time=10086 with three measurements is:
+                timestamp,     m1,    m2,     m3
+                    10086,  125.3,  True,  text1
+        :param device_id: String, time series path for device
+        :param timestamp: Integer, indicate the timestamp of the row of data
+        :param measurements: List of String, sensor names
+        :param data_types: List of TSDataType, indicate the data type for each sensor
+        :param values: List, values to be inserted, for each sensor
+        """
+        data_types = [data_type.value for data_type in data_types]
+        request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values)
+        status = self.__client.insertRecord(request)
+        print("insert one record to device {} message: {}".format(device_id, status.message))
+
+    def insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
+        """
+        insert multiple rows of data, records are independent to each other, in other words, there's no relationship
+        between those records
+        :param device_ids: List of String, time series paths for device
+        :param times: List of Integer, timestamps for records
+        :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+        :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+        :param values_lst: 2-D List, values to be inserted, for each device
+        """
+        type_values_lst = []
+        for types in types_lst:
+            data_types = [data_type.value for data_type in types]
+            type_values_lst.append(data_types)
+        request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst)
+        resp = self.__client.insertRecords(request)
+        print("insert multiple records to devices {} message: {}".format(device_ids, resp.statusList[0].message))
+
+    def test_insert_record(self, device_id, timestamp, measurements, data_types, values):
+        """
+        this method NOT insert data into database and the server just return after accept the request, this method
+        should be used to test other time cost in client
+        :param device_id: String, time series path for device
+        :param timestamp: Integer, indicate the timestamp of the row of data
+        :param measurements: List of String, sensor names
+        :param data_types: List of TSDataType, indicate the data type for each sensor
+        :param values: List, values to be inserted, for each sensor
+        """
+        data_types = [data_type.value for data_type in data_types]
+        request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values)
+        status = self.__client.testInsertRecord(request)
+        print("testing! insert one record to device {} message: {}".format(device_id, status.message))
+
+    def test_insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst):
+        """
+        this method NOT insert data into database and the server just return after accept the request, this method
+        should be used to test other time cost in client
+        :param device_ids: List of String, time series paths for device
+        :param times: List of Integer, timestamps for records
+        :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device
+        :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device
+        :param values_lst: 2-D List, values to be inserted, for each device
+        """
+        type_values_lst = []
+        for types in types_lst:
+            data_types = [data_type.value for data_type in types]
+            type_values_lst.append(data_types)
+        request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst)
+        status = self.__client.testInsertRecords(request)
+        print("testing! insert multiple records, message: {}".format(status.message))
+
+    def gen_insert_record_req(self, device_id, timestamp, measurements, data_types, values):
+        if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+            print("length of data types does not equal to length of values!")
+            # could raise an error here.
+            return
+        values_in_bytes = Session.value_to_bytes(data_types, values)
+        return TSInsertRecordReq(self.__session_id, device_id, measurements, values_in_bytes, timestamp)
+
+    def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst):
+        if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \
+           (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
+            print("deviceIds, times, measurementsList and valuesList's size should be equal")
+            # could raise an error here.
+            return
+
+        value_lst = []
+        for values, data_types, measurements in zip(values_lst, types_lst, measurements_lst):
+            if (len(values) != len(data_types)) or (len(values) != len(measurements)):
+                print("deviceIds, times, measurementsList and valuesList's size should be equal")
+                # could raise an error here.
+                return
+            values_in_bytes = Session.value_to_bytes(data_types, values)
+            value_lst.append(values_in_bytes)
+
+        return TSInsertRecordsReq(self.__session_id, device_ids, measurements_lst, value_lst, times)
+
+    def insert_tablet(self, tablet):
+        """
+        insert one tablet, in a tablet, for each timestamp, the number of measurements is same
+            for example three records in the same device can form a tablet:
+                timestamps,     m1,    m2,     m3
+                         1,  125.3,  True,  text1
+                         2,  111.6, False,  text2
+                         3,  688.6,  True,  text3
+        Notice: The tablet should not have empty cell
+                The tablet itself is sorted (see docs of Tablet.py)
+        :param tablet: a tablet specified above
+        """
+        resp = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
+        print("insert one tablet to device {} message: {}".format(tablet.get_device_id(), resp.statusList[0].message))
+
+    def insert_tablets(self, tablet_lst):
+        """
+        insert multiple tablets, tablets are independent to each other
+        :param tablet_lst: List of tablets
+        """
+        resp = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
+        print("insert multiple tablets, message: {}".format(resp.statusList[0].message))
+
+    def test_insert_tablet(self, tablet):
+        """
+         this method NOT insert data into database and the server just return after accept the request, this method
+         should be used to test other time cost in client
+        :param tablet: a tablet of data
+        """
+        status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
+        print("testing! insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message))
+
+    def test_insert_tablets(self, tablet_list):
+        """
+         this method NOT insert data into database and the server just return after accept the request, this method
+         should be used to test other time cost in client
+        :param tablet_list: List of tablets
+        """
+        status = self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list))
+        print("testing! insert multiple tablets, message: {}".format(status.message))
+
+    def gen_insert_tablet_req(self, tablet):
+        data_type_values = [data_type.value for data_type in tablet.get_data_types()]
+        return TSInsertTabletReq(self.__session_id, tablet.get_device_id(), tablet.get_measurements(),
+                                 tablet.get_binary_values(), tablet.get_binary_timestamps(),
+                                 data_type_values, tablet.get_row_number())
+
+    def gen_insert_tablets_req(self, tablet_lst):
+        device_id_lst = []
+        measurements_lst = []
+        values_lst = []
+        timestamps_lst = []
+        type_lst = []
+        size_lst = []
+        for tablet in tablet_lst:
+            data_type_values = [data_type.value for data_type in tablet.get_data_types()]
+            device_id_lst.append(tablet.get_device_id())
+            measurements_lst.append(tablet.get_measurements())
+            values_lst.append(tablet.get_binary_values())
+            timestamps_lst.append(tablet.get_binary_timestamps())
+            type_lst.append(data_type_values)
+            size_lst.append(tablet.get_row_number())
+        return TSInsertTabletsReq(self.__session_id, device_id_lst, measurements_lst,
+                                  values_lst, timestamps_lst, type_lst, size_lst)
+
+    def execute_query_statement(self, sql):
+        """
+        execute query sql statement and returns SessionDataSet
+        :param sql: String, query sql statement
+        :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py)
+        """
+        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size)
+        resp = self.__client.executeQueryStatement(request)
+        return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId,
+                              self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp)
+
+    def execute_non_query_statement(self, sql):
+        """
+        execute non-query sql statement
+        :param sql: String, non-query sql statement
+        """
+        request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id)
+        try:
+            resp = self.__client.executeUpdateStatement(request)
+            status = resp.status
+            print("execute non-query statement {} message: {}".format(sql, status.message))
+        except TTransport.TException as e:
+            print("execution of non-query statement fails because: ", e)
+
+    @staticmethod
+    def value_to_bytes(data_types, values):
+        format_str_list = [">"]
+        values_tobe_packed = []
+        for data_type, value in zip(data_types, values):
+            if data_type == TSDataType.BOOLEAN.value:
+                format_str_list.append("h")
+                format_str_list.append("?")
+                values_tobe_packed.append(TSDataType.BOOLEAN.value)
+                values_tobe_packed.append(value)
+            elif data_type == TSDataType.INT32.value:
+                format_str_list.append("h")
+                format_str_list.append("i")
+                values_tobe_packed.append(TSDataType.INT32.value)
+                values_tobe_packed.append(value)
+            elif data_type == TSDataType.INT64.value:
+                format_str_list.append("h")
+                format_str_list.append("q")
+                values_tobe_packed.append(TSDataType.INT64.value)
+                values_tobe_packed.append(value)
+            elif data_type == TSDataType.FLOAT.value:
+                format_str_list.append("h")
+                format_str_list.append("f")
+                values_tobe_packed.append(TSDataType.FLOAT.value)
+                values_tobe_packed.append(value)
+            elif data_type == TSDataType.DOUBLE.value:
+                format_str_list.append("h")
+                format_str_list.append("d")
+                values_tobe_packed.append(TSDataType.DOUBLE.value)
+                values_tobe_packed.append(value)
+            elif data_type == TSDataType.TEXT.value:
+                value_bytes = bytes(value, 'utf-8')
+                format_str_list.append("h")
+                format_str_list.append("i")
+                format_str_list.append(str(len(value_bytes)))
+                format_str_list.append("s")
+                values_tobe_packed.append(TSDataType.TEXT.value)
+                values_tobe_packed.append(len(value_bytes))
+                values_tobe_packed.append(value_bytes)
+            else:
+                print("Unsupported data type:" + str(data_type))
+                # could raise an error here.
+                return
+        format_str = ''.join(format_str_list)
+        return struct.pack(format_str, *values_tobe_packed)
+
+    def get_time_zone(self):
+        if self.__zone_id is not None:
+            return self.__zone_id
+        try:
+            resp = self.__client.getTimeZone(self.__session_id)
+        except TTransport.TException as e:
+            print("Could not get time zone because: ", e)
+            raise Exception
+        return resp.timeZone
+
+    def set_time_zone(self, zone_id):
+        request = TSSetTimeZoneReq(self.__session_id, zone_id)
+        try:
+            status = self.__client.setTimeZone(request)
+            print("setting time zone_id as {}, message: {}".format(zone_id, status.message))
+        except TTransport.TException as e:
+            print("Could not get time zone because: ", e)
+            raise Exception
+        self.__zone_id = zone_id
diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
new file mode 100644
index 0000000..2aac6e9
--- /dev/null
+++ b/client-py/src/SessionExample.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+sys.path.append("./utils")
+from IoTDBConstants import *
+from Tablet import Tablet
+from Session import Session
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_)
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting time series.
+session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
+session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
+
+# setting multiple time series once.
+ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
+                "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
+data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
+                  TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
+
+# delete time series
+session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"])
+
+# checking time series
+print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07"))
+print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03"))
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
+               TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
+session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
+
+# insert multiple records into database
+measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+                      ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
+values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
+                [True, 77, 88, 1.25, 8.125, "test_records02"]]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_)
+
+# insert one tablet into the database.
+values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
+           [True, 100, 11111, 1.25, 101.0, "test02"],
+           [False, 100, 1, 188.1, 688.25, "test03"],
+           [True, 0, 0, 0, 6.25, "test04"]]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
+session.insert_tablet(tablet_)
+
+# insert multiple tablets into database
+tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
+tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
+session.insert_tablets([tablet_01, tablet_02])
+
+# execute non-query sql statement
+session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
+session_data_set.set_fetch_size(1024)
+while session_data_set.has_next():
+    print(session_data_set.next())
+session_data_set.close_operation_handle()
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/src/SessionUT.py b/client-py/src/SessionUT.py
new file mode 100644
index 0000000..fc79cfe
--- /dev/null
+++ b/client-py/src/SessionUT.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import sys
+import struct
+
+sys.path.append("./utils")
+
+from Session import Session
+from Tablet import Tablet
+from IoTDBConstants import *
+from SessionDataSet import SessionDataSet
+
+from thrift.protocol import TBinaryProtocol, TCompactProtocol
+from thrift.transport import TSocket, TTransport
+
+
+class MyTestCase(unittest.TestCase):
+
+    def setUp(self) -> None:
+        self.session = Session("127.0.0.1", 6667, "root", "root")
+        self.session.open(False)
+
+    def tearDown(self) -> None:
+        self.session.close()
+
+    def test_insert_by_str(self):
+        self.session.set_storage_group("root.sg1")
+
+        self.__createTimeSeries(self.session)
+        self.insertByStr(self.session)
+
+        # sql test
+        self.insert_via_sql(self.session)
+        self.query3()
+
+    def test_insert_by_blank_str_infer_type(self):
+        device_id = "root.sg1.d1"
+        measurements = ["s1"]
+        values = ["1.0"]
+        self.session.insert_str_record(device_id, 1, measurements, values)
+
+        expected = "root.sg1.d1.s1 "
+
+        self.assertFalse(self.session.check_time_series_exists("root.sg1.d1.s1 "))
+        data_set = self.session.execute_query_statement("show timeseries")
+        i = 0
+        while data_set.hasNext():
+            self.assertEquals(expected[i], str(data_set.next().get_fields().get(0)))
+            i += 1
+
+    @staticmethod
+    def __createTimeSeries(session):
+        session.create_time_series("root.sg1.d1.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+        session.create_time_series("root.sg1.d1.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+        session.create_time_series("root.sg1.d1.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+        session.create_time_series("root.sg1.d2.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+        session.create_time_series("root.sg1.d2.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+        session.create_time_series("root.sg1.d2.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY)
+
+    @staticmethod
+    def insertByStr(session):
+
+        device_id = "root.sg1.d1"
+        measurements = ["s1", "s2", "s3"]
+
+        for time in range(100):
+            values = ["1", "2", "3"]
+            session.insert_str_record(device_id, time, measurements, values)
+
+    @staticmethod
+    def insert_via_sql(session):
+        session.execute_non_query_statement("insert into root.sg1.d1(timestamp, s1, s2, s3) values(100, 1, 2, 3)")
+
+    def query3(self):
+        session_data_set = self.session.execute_query_statement("select * from root.sg1.d1")
+        session_data_set.set_fetch_size(1024)
+        count = 0
+        while session_data_set.has_next():
+            index = 1
+            count += 1
+            for f in session_data_set.next().get_fields():
+                self.assertEqual(str(index), f.get_string_value())
+                index += 1
+
+        self.assertEqual(101, count)
+        session_data_set.close_operation_handle()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/client-py/src/utils/Field.py b/client-py/src/utils/Field.py
new file mode 100644
index 0000000..7b5fea5
--- /dev/null
+++ b/client-py/src/utils/Field.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+
+
+class Field(object):
+
+    def __init__(self, data_type):
+        """
+        :param data_type: TSDataType
+        """
+        self.__data_type = data_type
+        self.__bool_value = None
+        self.__int_value = None
+        self.__long_value = None
+        self.__float_value = None
+        self.__double_value = None
+        self.__binary_value = None
+
+    @staticmethod
+    def copy(field):
+        output = Field(field.get_data_type())
+        if output.get_data_type() is not None:
+            if output.get_data_type() == TSDataType.BOOLEAN:
+                output.set_bool_value(field.get_bool_value())
+            elif output.get_data_type() == TSDataType.INT32:
+                output.set_int_value(field.get_int_value())
+            elif output.get_data_type() == TSDataType.INT64:
+                output.set_long_value(field.get_long_value())
+            elif output.get_data_type() == TSDataType.FLOAT:
+                output.set_float_value(field.get_float_value())
+            elif output.get_data_type() == TSDataType.DOUBLE:
+                output.set_double_value(field.get_double_value())
+            elif output.get_data_type() == TSDataType.TEXT:
+                output.set_binary_value(field.get_binary_value())
+            else:
+                raise Exception("unsupported data type {}".format(output.get_data_type()))
+        return output
+
+    def get_data_type(self):
+        return self.__data_type
+
+    def is_null(self):
+        return self.__data_type is None
+
+    def set_bool_value(self, value):
+        self.__bool_value = value
+
+    def get_bool_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__bool_value
+
+    def set_int_value(self, value):
+        self.__int_value = value
+
+    def get_int_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__int_value
+
+    def set_long_value(self, value):
+        self.__long_value = value
+
+    def get_long_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__long_value
+
+    def set_float_value(self, value):
+        self.__float_value = value
+
+    def get_float_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__float_value
+
+    def set_double_value(self, value):
+        self.__double_value = value
+
+    def get_double_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__double_value
+
+    def set_binary_value(self, value):
+        self.__binary_value = value
+
+    def get_binary_value(self):
+        if self.__data_type is None:
+            raise Exception("Null Field Exception!")
+        return self.__binary_value
+
+    def get_string_value(self):
+        if self.__data_type is None:
+            return "None"
+        elif self.__data_type == TSDataType.BOOLEAN:
+            return str(self.__bool_value)
+        elif self.__data_type == TSDataType.INT64:
+            return str(self.__long_value)
+        elif self.__data_type == TSDataType.INT32:
+            return str(self.__int_value)
+        elif self.__data_type == TSDataType.FLOAT:
+            return str(self.__float_value)
+        elif self.__data_type == TSDataType.DOUBLE:
+            return str(self.__double_value)
+        elif self.__data_type == TSDataType.TEXT:
+            return self.__binary_value.decode('utf-8')
+        else:
+            raise Exception("unsupported data type {}".format(self.__data_type))
+
+    def __str__(self):
+        return self.get_string_value()
+
+    def get_object_value(self, data_type):
+        """
+        :param data_type: TSDataType
+        """
+        if self.__data_type is None:
+            return None
+        elif data_type == TSDataType.BOOLEAN:
+            return self.get_bool_value()
+        elif data_type == TSDataType.INT32:
+            return self.get_int_value()
+        elif data_type == TSDataType.INT64:
+            return self.get_long_value()
+        elif data_type == TSDataType.FLOAT:
+            return self.get_float_value()
+        elif data_type == TSDataType.DOUBLE:
+            return self.get_double_value()
+        elif data_type == TSDataType.TEXT:
+            return self.get_binary_value()
+        else:
+            raise Exception("unsupported data type {}".format(data_type))
+
+    @staticmethod
+    def get_field(value, data_type):
+        """
+        :param value: field value corresponding to the data type
+        :param data_type: TSDataType
+        """
+        if value is None:
+            return None
+        field = Field(data_type)
+        if data_type == TSDataType.BOOLEAN:
+            field.set_bool_value(value)
+        elif data_type == TSDataType.INT32:
+            field.set_int_value(value)
+        elif data_type == TSDataType.INT64:
+            field.set_long_value(value)
+        elif data_type == TSDataType.FLOAT:
+            field.set_float_value(value)
+        elif data_type == TSDataType.DOUBLE:
+            field.set_double_value(value)
+        elif data_type == TSDataType.TEXT:
+            field.set_binary_value(value)
+        else:
+            raise Exception("unsupported data type {}".format(data_type))
+        return field
+
diff --git a/client-py/src/utils/IoTDBConstants.py b/client-py/src/utils/IoTDBConstants.py
new file mode 100644
index 0000000..7e31262
--- /dev/null
+++ b/client-py/src/utils/IoTDBConstants.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from enum import Enum, unique
+
+
+@unique
+class TSDataType(Enum):
+    BOOLEAN = 0
+    INT32 = 1
+    INT64 = 2
+    FLOAT = 3
+    DOUBLE = 4
+    TEXT = 5
+
+
+@unique
+class TSEncoding(Enum):
+    PLAIN = 0
+    PLAIN_DICTIONARY = 1
+    RLE = 2
+    DIFF = 3
+    TS_2DIFF = 4
+    BITMAP = 5
+    GORILLA = 6
+    REGULAR = 7
+
+
+@unique
+class Compressor(Enum):
+    UNCOMPRESSED = 0
+    SNAPPY = 1
+    GZIP = 2
+    LZO = 3
+    SDT = 4
+    PAA = 5
+    PLA = 6
diff --git a/client-py/src/utils/IoTDBRpcDataSet.py b/client-py/src/utils/IoTDBRpcDataSet.py
new file mode 100644
index 0000000..f712c98
--- /dev/null
+++ b/client-py/src/utils/IoTDBRpcDataSet.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import *
+
+from thrift.transport import TTransport
+from iotdb.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq
+
+
+class IoTDBRpcDataSet(object):
+    TIMESTAMP_STR = "Time"
+    # VALUE_IS_NULL = "The value got by %s (column name) is NULL."
+    START_INDEX = 2
+    FLAG = 0x80
+
+    def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
+                 client, session_id, query_data_set, fetch_size):
+        self.__session_id = session_id
+        self.__ignore_timestamp = ignore_timestamp
+        self.__sql = sql
+        self.__query_id = query_id
+        self.__client = client
+        self.__fetch_size = fetch_size
+        self.__column_size = len(column_name_list)
+
+        self.__column_name_list = []
+        self.__column_type_list = []
+        self.__column_ordinal_dict = {}
+        if not ignore_timestamp:
+            self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR)
+            self.__column_type_list.append(TSDataType.INT64)
+            self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
+
+        if column_name_index is not None:
+            self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))]
+            for i in range(len(column_name_list)):
+                name = column_name_list[i]
+                self.__column_name_list.append(name)
+                self.__column_type_list.append(TSDataType[column_type_list[i]])
+                if name not in self.__column_ordinal_dict:
+                    index = column_name_index[name]
+                    self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX
+                    self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]]
+        else:
+            index = IoTDBRpcDataSet.START_INDEX
+            self.__column_type_deduplicated_list = []
+            for i in range(len(column_name_list)):
+                name = column_name_list[i]
+                self.__column_name_list.append(name)
+                self.__column_type_list.append(TSDataType[column_type_list[i]])
+                if name not in self.__column_ordinal_dict:
+                    self.__column_ordinal_dict[name] = index
+                    index += 1
+                    self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]])
+
+        self.__time_bytes = bytes(0)
+        self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))]
+        self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))]
+        self.__query_data_set = query_data_set
+        self.__is_closed = False
+        self.__empty_resultSet = False
+        self.__has_cached_record = False
+        self.__rows_index = 0
+
+    def close(self):
+        if self.__is_closed:
+            return
+        if self.__client is not None:
+            try:
+                status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id))
+                print("close session {}, message: {}".format(self.__session_id, status.message))
+            except TTransport.TException as e:
+                print("close session {} failed because: ".format(self.__session_id), e)
+                raise Exception
+
+            self.__is_closed = True
+            self.__client = None
+
+    def next(self):
+        if self.has_cached_result():
+            self.construct_one_row()
+            return True
+        if self.__empty_resultSet:
+            return False
+        if self.fetch_results():
+            self.construct_one_row()
+            return True
+        return False
+
+    def has_cached_result(self):
+        return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0)
+
+    def construct_one_row(self):
+        # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
+        self.__time_bytes = self.__query_data_set.time[:8]
+        self.__query_data_set.time = self.__query_data_set.time[8:]
+        for i in range(len(self.__query_data_set.bitmapList)):
+            bitmap_buffer = self.__query_data_set.bitmapList[i]
+
+            # another 8 new rows, should move the bitmap buffer position to next byte
+            if self.__rows_index % 8 == 0:
+                self.__current_bitmap[i] = bitmap_buffer[0]
+                self.__query_data_set.bitmapList[i] = bitmap_buffer[1:]
+            if not self.is_null(i, self.__rows_index):
+                value_buffer = self.__query_data_set.valueList[i]
+                data_type = self.__column_type_deduplicated_list[i]
+
+                # simulating buffer
+                if data_type == TSDataType.BOOLEAN:
+                    self.__value[i] = value_buffer[:1]
+                    self.__query_data_set.valueList[i] = value_buffer[1:]
+                elif data_type == TSDataType.INT32:
+                    self.__value[i] = value_buffer[:4]
+                    self.__query_data_set.valueList[i] = value_buffer[4:]
+                elif data_type == TSDataType.INT64:
+                    self.__value[i] = value_buffer[:8]
+                    self.__query_data_set.valueList[i] = value_buffer[8:]
+                elif data_type == TSDataType.FLOAT:
+                    self.__value[i] = value_buffer[:4]
+                    self.__query_data_set.valueList[i] = value_buffer[4:]
+                elif data_type == TSDataType.DOUBLE:
+                    self.__value[i] = value_buffer[:8]
+                    self.__query_data_set.valueList[i] = value_buffer[8:]
+                elif data_type == TSDataType.TEXT:
+                    length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False)
+                    self.__value[i] = value_buffer[4: 4 + length]
+                    self.__query_data_set.valueList[i] = value_buffer[4 + length:]
+                else:
+                    print("unsupported data type {}.".format(data_type))
+                    # could raise exception here
+        self.__rows_index += 1
+        self.__has_cached_record = True
+
+    def fetch_results(self):
+        self.__rows_index = 0
+        request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
+        try:
+            resp = self.__client.fetchResults(request)
+            if not resp.hasResultSet:
+                self.__empty_resultSet = True
+            else:
+                self.__query_data_set = resp.queryDataSet
+            return resp.hasResultSet
+        except TTransport.TException as e:
+            print("Cannot fetch result from server, because of network connection: ", e)
+
+    def is_null(self, index, row_num):
+        bitmap = self.__current_bitmap[index]
+        shift = row_num % 8
+        return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0
+
+    def is_null_by_index(self, column_index):
+        index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX
+        # time column will never be None
+        if index < 0:
+            return True
+        return self.is_null(index, self.__rows_index - 1)
+
+    def is_null_by_name(self, column_name):
+        index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX
+        # time column will never be None
+        if index < 0:
+            return True
+        return self.is_null(index, self.__rows_index - 1)
+
+    def find_column_name_by_index(self, column_index):
+        if column_index <= 0:
+            raise Exception("Column index should start from 1")
+        if column_index > len(self.__column_name_list):
+            raise Exception("column index {} out of range {}".format(column_index, self.__column_size))
+        return self.__column_name_list[column_index - 1]
+
+    def get_fetch_size(self):
+        return self.__fetch_size
+
+    def set_fetch_size(self, fetch_size):
+        self.__fetch_size = fetch_size
+
+    def get_column_names(self):
+        return self.__column_name_list
+
+    def get_column_types(self):
+        return self.__column_type_list
+
+    def get_column_size(self):
+        return self.__column_size
+
+    def get_ignore_timestamp(self):
+        return self.__ignore_timestamp
+
+    def get_column_ordinal_dict(self):
+        return self.__column_ordinal_dict
+
+    def get_column_type_deduplicated_list(self):
+        return self.__column_type_deduplicated_list
+
+    def get_values(self):
+        return self.__value
+
+    def get_time_bytes(self):
+        return self.__time_bytes
+
+    def get_has_cached_record(self):
+        return self.__has_cached_record
diff --git a/client-py/src/utils/RowRecord.py b/client-py/src/utils/RowRecord.py
new file mode 100644
index 0000000..78e0362
--- /dev/null
+++ b/client-py/src/utils/RowRecord.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+from Field import Field
+
+
+class RowRecord(object):
+
+    def __init__(self, timestamp, field_list=None):
+        self.__timestamp = timestamp
+        self.__field_list = field_list
+
+    def add_field(self, field):
+        self.__field_list.append(field)
+
+    def add_field(self, value, data_type):
+        self.__field_list.append(Field.get_field(value, data_type))
+
+    def __str__(self):
+        str_list = [str(self.__timestamp)]
+        for field in self.__field_list:
+            str_list.append("\t\t")
+            str_list.append(str(field))
+        return "".join(str_list)
+
+    def get_timestamp(self):
+        return self.__timestamp
+
+    def set_timestamp(self, timestamp):
+        self.__timestamp = timestamp
+
+    def get_fields(self):
+        return self.__field_list
+
+    def set_fields(self, field_list):
+        self.__field_list = field_list
+
+    def set_field(self, index, field):
+        self.__field_list[index] = field
diff --git a/client-py/src/utils/SessionDataSet.py b/client-py/src/utils/SessionDataSet.py
new file mode 100644
index 0000000..92662cc
--- /dev/null
+++ b/client-py/src/utils/SessionDataSet.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import TSDataType
+from IoTDBRpcDataSet import IoTDBRpcDataSet
+from Field import Field
+from RowRecord import RowRecord
+import struct
+
+
+class SessionDataSet(object):
+
+    def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id,
+                 query_data_set, ignore_timestamp):
+        self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index,
+                                                  ignore_timestamp, query_id, client, session_id, query_data_set, 1024)
+
+    def get_fetch_size(self):
+        return self.iotdb_rpc_data_set.get_fetch_size()
+
+    def set_fetch_size(self, fetch_size):
+        self.iotdb_rpc_data_set.set_fetch_size(fetch_size)
+
+    def get_column_names(self):
+        return self.iotdb_rpc_data_set.get_column_names()
+
+    def get_column_types(self):
+        return self.iotdb_rpc_data_set.get_column_types()
+
+    def has_next(self):
+        return self.iotdb_rpc_data_set.next()
+
+    def next(self):
+        if not self.iotdb_rpc_data_set.get_has_cached_record():
+            if not self.has_next():
+                return None
+        self.iotdb_rpc_data_set.has_cached_record = False
+        return self.construct_row_record_from_value_array()
+
+    def construct_row_record_from_value_array(self):
+        out_fields = []
+        for i in range(self.iotdb_rpc_data_set.get_column_size()):
+            index = i + 1
+            data_set_column_index = i + IoTDBRpcDataSet.START_INDEX
+            if self.iotdb_rpc_data_set.get_ignore_timestamp():
+                index -= 1
+                data_set_column_index -= 1
+            column_name = self.iotdb_rpc_data_set.get_column_names()[index]
+            location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX
+
+            if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
+                value_bytes = self.iotdb_rpc_data_set.get_values()[location]
+                data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location]
+                field = Field(data_type)
+                if data_type == TSDataType.BOOLEAN:
+                    value = struct.unpack(">?", value_bytes)[0]
+                    field.set_bool_value(value)
+                elif data_type == TSDataType.INT32:
+                    value = struct.unpack(">i", value_bytes)[0]
+                    field.set_int_value(value)
+                elif data_type == TSDataType.INT64:
+                    value = struct.unpack(">q", value_bytes)[0]
+                    field.set_long_value(value)
+                elif data_type == TSDataType.FLOAT:
+                    value = struct.unpack(">f", value_bytes)[0]
+                    field.set_float_value(value)
+                elif data_type == TSDataType.DOUBLE:
+                    value = struct.unpack(">d", value_bytes)[0]
+                    field.set_double_value(value)
+                elif data_type == TSDataType.TEXT:
+                    field.set_binary_value(value_bytes)
+                else:
+                    print("unsupported data type {}.".format(data_type))
+                    # could raise exception here
+            else:
+                field = Field(None)
+            out_fields.append(field)
+
+        return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields)
+
+    def close_operation_handle(self):
+        self.iotdb_rpc_data_set.close()
+
+
+
+
diff --git a/client-py/src/utils/Tablet.py b/client-py/src/utils/Tablet.py
new file mode 100644
index 0000000..51798a4
--- /dev/null
+++ b/client-py/src/utils/Tablet.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from IoTDBConstants import *
+import struct
+
+
+class Tablet(object):
+
+    def __init__(self, device_id, measurements, data_types, values, timestamps):
+        """
+        creating a tablet for insertion
+          for example, considering device: root.sg1.d1
+            timestamps,     m1,    m2,     m3
+                     1,  125.3,  True,  text1
+                     2,  111.6, False,  text2
+                     3,  688.6,  True,  text3
+        Notice: The tablet should not have empty cell
+                The tablet will be sorted at the initialization by timestamps
+
+        :param device_id: String, IoTDB time series path to device layer (without sensor).
+        :param measurements: List, sensors.
+        :param data_types: TSDataType List, specify value types for sensors.
+        :param values: 2-D List, the values of each row should be the outer list element.
+        :param timestamps: List.
+        """
+        if len(timestamps) != len(values):
+            print("Input error! len(timestamps) does not equal to len(values)!")
+            # could raise an error here.
+
+        if not Tablet.check_sorted(timestamps):
+            sorted_zipped = sorted(zip(timestamps, values))
+            result = zip(*sorted_zipped)
+            self.__timestamps, self.__values = [list(x) for x in result]
+        else:
+            self.__values = values
+            self.__timestamps = timestamps
+
+        self.__device_id = device_id
+        self.__measurements = measurements
+        self.__data_types = data_types
+        self.__row_number = len(timestamps)
+        self.__column_number = len(measurements)
+
+    @staticmethod
+    def check_sorted(timestamps):
+        for i in range(1, len(timestamps)):
+            if timestamps[i] < timestamps[i - 1]:
+                return False
+        return True
+
+    def get_measurements(self):
+        return self.__measurements
+
+    def get_data_types(self):
+        return self.__data_types
+
+    def get_row_number(self):
+        return self.__row_number
+
+    def get_device_id(self):
+        return self.__device_id
+
+    def get_binary_timestamps(self):
+        format_str_list = [">"]
+        values_tobe_packed = []
+        for timestamp in self.__timestamps:
+            format_str_list.append("q")
+            values_tobe_packed.append(timestamp)
+
+        format_str = ''.join(format_str_list)
+        return struct.pack(format_str, *values_tobe_packed)
+
+    def get_binary_values(self):
+        format_str_list = [">"]
+        values_tobe_packed = []
+        for i in range(self.__column_number):
+            if self.__data_types[i] == TSDataType.BOOLEAN:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("?")
+                for j in range(self.__row_number):
+                    values_tobe_packed.append(self.__values[j][i])
+            elif self.__data_types[i] == TSDataType.INT32:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("i")
+                for j in range(self.__row_number):
+                    values_tobe_packed.append(self.__values[j][i])
+            elif self.__data_types[i] == TSDataType.INT64:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("q")
+                for j in range(self.__row_number):
+                    values_tobe_packed.append(self.__values[j][i])
+            elif self.__data_types[i] == TSDataType.FLOAT:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("f")
+                for j in range(self.__row_number):
+                    values_tobe_packed.append(self.__values[j][i])
+            elif self.__data_types[i] == TSDataType.DOUBLE:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("d")
+                for j in range(self.__row_number):
+                    values_tobe_packed.append(self.__values[j][i])
+            elif self.__data_types[i] == TSDataType.TEXT:
+                for j in range(self.__row_number):
+                    value_bytes = bytes(self.__values[j][i], 'utf-8')
+                    format_str_list.append("i")
+                    format_str_list.append(str(len(value_bytes)))
+                    format_str_list.append("s")
+                    values_tobe_packed.append(len(value_bytes))
+                    values_tobe_packed.append(value_bytes)
+            else:
+                print("Unsupported data type:" + str(self.__data_types[i]))
+                # could raise an error here.
+                return
+
+        format_str = ''.join(format_str_list)
+        return struct.pack(format_str, *values_tobe_packed)
+
diff --git a/client-py/src/utils/__init__.py b/client-py/src/utils/__init__.py
new file mode 100644
index 0000000..a4797b6
--- /dev/null
+++ b/client-py/src/utils/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+