You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 12:48:55 UTC

[iotdb] 01/03: finish python session

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9fd48778854f9c41077a3d1047e592a2d8469f91
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 14 20:04:29 2022 +0800

    finish python session
---
 client-py/README.md                      |   2 +-
 client-py/SessionExample.py              | 762 ++++++++++++++++---------------
 client-py/iotdb/Session.py               | 224 +++++----
 client-py/iotdb/dbapi/Cursor.py          |   2 +-
 client-py/iotdb/utils/IoTDBRpcDataSet.py |  13 +-
 client-py/iotdb/utils/SessionDataSet.py  |  66 ++-
 client-py/tests/test_dataframe.py        |   4 +-
 client-py/tests/test_tablet.py           |   4 +-
 client-py/tests/test_todf.py             |   6 +-
 9 files changed, 586 insertions(+), 497 deletions(-)

diff --git a/client-py/README.md b/client-py/README.md
index 3197cc7186..c124b32881 100644
--- a/client-py/README.md
+++ b/client-py/README.md
@@ -386,7 +386,7 @@ session.open(False)
 result = session.execute_query_statement("SELECT * FROM root.*")
 
 # Transform to Pandas Dataset
-df = result.todf()
+df = result.to_df()
 
 session.close()
 
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 61e82234db..4f04855def 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -35,382 +35,398 @@ password_ = "root"
 session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
 session.open(False)
 
-# set and delete storage groups
-session.set_storage_group("root.sg_test_01")
-session.set_storage_group("root.sg_test_02")
-session.set_storage_group("root.sg_test_03")
-session.set_storage_group("root.sg_test_04")
-session.delete_storage_group("root.sg_test_02")
-session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
-
-# setting time series.
-session.create_time_series(
-    "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_02.s_01",
-    TSDataType.BOOLEAN,
-    TSEncoding.PLAIN,
-    Compressor.SNAPPY,
-    None,
-    {"tag1": "v1"},
-    {"description": "v1"},
-    "temperature",
-)
-
-# setting multiple time series once.
-ts_path_lst_ = [
-    "root.sg_test_01.d_01.s_04",
-    "root.sg_test_01.d_01.s_05",
-    "root.sg_test_01.d_01.s_06",
-    "root.sg_test_01.d_01.s_07",
-    "root.sg_test_01.d_01.s_08",
-    "root.sg_test_01.d_01.s_09",
-]
-data_type_lst_ = [
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
-    ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
-)
-
-ts_path_lst_ = [
-    "root.sg_test_01.d_02.s_04",
-    "root.sg_test_01.d_02.s_05",
-    "root.sg_test_01.d_02.s_06",
-    "root.sg_test_01.d_02.s_07",
-    "root.sg_test_01.d_02.s_08",
-    "root.sg_test_01.d_02.s_09",
-]
-data_type_lst_ = [
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
-attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
-    ts_path_lst_,
-    data_type_lst_,
-    encoding_lst_,
-    compressor_lst_,
-    None,
-    tags_lst_,
-    attributes_lst_,
-    None,
-)
-
-# delete time series
-session.delete_time_series(
-    [
-        "root.sg_test_01.d_01.s_07",
-        "root.sg_test_01.d_01.s_08",
-        "root.sg_test_01.d_01.s_09",
-    ]
-)
-
-# checking time series
-print(
-    "s_07 expecting False, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
-)
-print(
-    "s_03 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
-)
-print(
-    "d_02.s_01 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
-)
-print(
-    "d_02.s_06 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
-)
-
-# insert one record into the database.
-measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
-values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
-data_types_ = [
-    TSDataType.BOOLEAN,
-    TSDataType.INT32,
-    TSDataType.INT64,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
+ts_path_list = [
+    "root.sg1.d1.s1",
+    "root.sg1.d1.s2"
 ]
-session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
-
-# insert multiple records into database
-measurements_list_ = [
-    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
-    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
-]
-values_list_ = [
-    [False, 22, 33, 4.4, 55.1, "test_records01"],
-    [True, 77, 88, 1.25, 8.125, "test_records02"],
-]
-data_type_list_ = [data_types_, data_types_]
-device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
-session.insert_records(
-    device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
-)
-
-# insert one tablet into the database.
-values_ = [
-    [False, 10, 11, 1.1, 10011.1, "test01"],
-    [True, 100, 11111, 1.25, 101.0, "test02"],
-    [False, 100, 1, 188.1, 688.25, "test03"],
-    [True, 0, 0, 0, 6.25, "test04"],
-]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [4, 5, 6, 7]
-tablet_ = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert one numpy tablet into the database.
-np_values_ = [
-    np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
-    np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
-    np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
-    np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
-    np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
-    np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
-]
-np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
-np_tablet_ = NumpyTablet(
-    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
-)
-session.insert_tablet(np_tablet_)
-
-# insert one unsorted numpy tablet into the database.
-np_values_unsorted = [
-    np.array([False, False, False, True, True], np.dtype(">?")),
-    np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
-    np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
-    np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
-    np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
-    np.array(["test09", "test08", "test07", "test06", "test05"]),
-]
-np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
-np_tablet_unsorted = NumpyTablet(
-    "root.sg_test_01.d_02",
-    measurements_,
-    data_types_,
-    np_values_unsorted,
-    np_timestamps_unsorted,
-)
-session.insert_tablet(np_tablet_unsorted)
-print(np_tablet_unsorted.get_timestamps())
-for value in np_tablet_unsorted.get_values():
-    print(value)
-
-# insert multiple tablets into database
-tablet_01 = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
-)
-tablet_02 = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
-)
-session.insert_tablets([tablet_01, tablet_02])
-
-# insert one tablet with empty cells into the database.
-values_ = [
-    [None, 10, 11, 1.1, 10011.1, "test01"],
-    [True, None, 11111, 1.25, 101.0, "test02"],
-    [False, 100, 1, None, 688.25, "test03"],
-    [True, 0, 0, 0, 6.25, None],
-]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [16, 17, 18, 19]
-tablet_ = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert records of one device
-time_list = [1, 2, 3]
-measurements_list = [
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-]
-data_types_list = [
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-]
-values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
-
-session.insert_records_of_one_device(
-    "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
-)
-
-# execute non-query sql statement
-session.execute_non_query_statement(
-    "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# execute sql query statement
-with session.execute_query_statement(
-    "select * from root.sg_test_01.d_01"
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-# execute sql query statement
-with session.execute_query_statement(
-    "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-# execute statement
-with session.execute_statement(
-    "select * from root.sg_test_01.d_01"
-) as session_data_set:
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-session.execute_statement(
-    "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# insert string records of one device
-time_list = [1, 2, 3]
-measurements_list = [
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-]
-values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
-
-session.insert_string_records_of_one_device(
-    "root.sg_test_01.d_03",
-    time_list,
-    measurements_list,
-    values_list,
-)
-
-with session.execute_raw_data_query(
-    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-with session.execute_last_data_query(
-    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-# delete storage group
-session.delete_storage_group("root.sg_test_01")
-
-# create measurement node template
-template = Template(name="template_python", share_time=False)
-m_node_1 = MeasurementNode(
-    name="s1",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-m_node_2 = MeasurementNode(
-    name="s2",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-m_node_3 = MeasurementNode(
-    name="s3",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-template.add_template(m_node_1)
-template.add_template(m_node_2)
-template.add_template(m_node_3)
-session.create_schema_template(template)
-print("create template success template_python")
-
-# create internal node template
-template_name = "treeTemplate_python"
-template = Template(name=template_name, share_time=True)
-i_node_gps = InternalNode(name="GPS", share_time=False)
-i_node_v = InternalNode(name="vehicle", share_time=True)
-m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
-
-i_node_gps.add_child(m_node_x)
-i_node_v.add_child(m_node_x)
-template.add_template(i_node_gps)
-template.add_template(i_node_v)
-template.add_template(m_node_x)
 
-session.create_schema_template(template)
-print("create template success treeTemplate_python}")
-
-print(session.is_measurement_in_template(template_name, "GPS"))
-print(session.is_measurement_in_template(template_name, "GPS.x"))
-print(session.show_all_templates())
-
-# # append schema template
-data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
-encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
-compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
-
-measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
-session.add_measurements_in_template(
-    template_name,
-    measurements_aligned_path,
-    data_types,
-    encoding_list,
-    compressor_list,
-    is_aligned=True,
-)
-# session.drop_schema_template("add_template_python")
-measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
-session.add_measurements_in_template(
-    template_name,
-    measurements_aligned_path,
-    data_types,
-    encoding_list,
-    compressor_list,
-    is_aligned=False,
-)
-session.delete_node_in_template(template_name, "aligned.s1")
-print(session.count_measurements_in_template(template_name))
-print(session.is_path_exist_in_template(template_name, "aligned.s1"))
-print(session.is_path_exist_in_template(template_name, "aligned.s2"))
-
-session.set_schema_template(template_name, "root.python.set")
-print(session.show_paths_template_using_on(template_name))
-print(session.show_paths_template_set_on(template_name))
-session.unset_schema_template(template_name, "root.python.set")
+fetch_args = {
+    "start_time": 0,
+    "end_time": 32,
+    "interval": 4,
+    "sliding_step": 1,
+    "indexes": [0, 3, 5, 9]
+}
+
+print(session.fetch_window_batch(ts_path_list, None, fetch_args))
+
+# # set and delete storage groups
+# session.set_storage_group("root.sg_test_01")
+# session.set_storage_group("root.sg_test_02")
+# session.set_storage_group("root.sg_test_03")
+# session.set_storage_group("root.sg_test_04")
+# session.delete_storage_group("root.sg_test_02")
+# session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+#
+# # setting time series.
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_02.s_01",
+#     TSDataType.BOOLEAN,
+#     TSEncoding.PLAIN,
+#     Compressor.SNAPPY,
+#     None,
+#     {"tag1": "v1"},
+#     {"description": "v1"},
+#     "temperature",
+# )
+#
+# # setting multiple time series once.
+# ts_path_lst_ = [
+#     "root.sg_test_01.d_01.s_04",
+#     "root.sg_test_01.d_01.s_05",
+#     "root.sg_test_01.d_01.s_06",
+#     "root.sg_test_01.d_01.s_07",
+#     "root.sg_test_01.d_01.s_08",
+#     "root.sg_test_01.d_01.s_09",
+# ]
+# data_type_lst_ = [
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+#     ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+# )
+#
+# ts_path_lst_ = [
+#     "root.sg_test_01.d_02.s_04",
+#     "root.sg_test_01.d_02.s_05",
+#     "root.sg_test_01.d_02.s_06",
+#     "root.sg_test_01.d_02.s_07",
+#     "root.sg_test_01.d_02.s_08",
+#     "root.sg_test_01.d_02.s_09",
+# ]
+# data_type_lst_ = [
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
+# attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+#     ts_path_lst_,
+#     data_type_lst_,
+#     encoding_lst_,
+#     compressor_lst_,
+#     None,
+#     tags_lst_,
+#     attributes_lst_,
+#     None,
+# )
+#
+# # delete time series
+# session.delete_time_series(
+#     [
+#         "root.sg_test_01.d_01.s_07",
+#         "root.sg_test_01.d_01.s_08",
+#         "root.sg_test_01.d_01.s_09",
+#     ]
+# )
+#
+# # checking time series
+# print(
+#     "s_07 expecting False, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
+# )
+# print(
+#     "s_03 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
+# )
+# print(
+#     "d_02.s_01 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
+# )
+# print(
+#     "d_02.s_06 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
+# )
+#
+# # insert one record into the database.
+# measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+# values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+# data_types_ = [
+#     TSDataType.BOOLEAN,
+#     TSDataType.INT32,
+#     TSDataType.INT64,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
+#
+# # insert multiple records into database
+# measurements_list_ = [
+#     ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+#     ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+# ]
+# values_list_ = [
+#     [False, 22, 33, 4.4, 55.1, "test_records01"],
+#     [True, 77, 88, 1.25, 8.125, "test_records02"],
+# ]
+# data_type_list_ = [data_types_, data_types_]
+# device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+# session.insert_records(
+#     device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+# )
+#
+# # insert one tablet into the database.
+# values_ = [
+#     [False, 10, 11, 1.1, 10011.1, "test01"],
+#     [True, 100, 11111, 1.25, 101.0, "test02"],
+#     [False, 100, 1, 188.1, 688.25, "test03"],
+#     [True, 0, 0, 0, 6.25, "test04"],
+# ]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [4, 5, 6, 7]
+# tablet_ = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert one numpy tablet into the database.
+# np_values_ = [
+#     np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
+#     np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
+#     np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
+#     np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
+#     np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
+#     np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
+# ]
+# np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
+# np_tablet_ = NumpyTablet(
+#     "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+# )
+# session.insert_tablet(np_tablet_)
+#
+# # insert one unsorted numpy tablet into the database.
+# np_values_unsorted = [
+#     np.array([False, False, False, True, True], np.dtype(">?")),
+#     np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
+#     np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
+#     np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
+#     np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
+#     np.array(["test09", "test08", "test07", "test06", "test05"]),
+# ]
+# np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
+# np_tablet_unsorted = NumpyTablet(
+#     "root.sg_test_01.d_02",
+#     measurements_,
+#     data_types_,
+#     np_values_unsorted,
+#     np_timestamps_unsorted,
+# )
+# session.insert_tablet(np_tablet_unsorted)
+# print(np_tablet_unsorted.get_timestamps())
+# for value in np_tablet_unsorted.get_values():
+#     print(value)
+#
+# # insert multiple tablets into database
+# tablet_01 = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
+# )
+# tablet_02 = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
+# )
+# session.insert_tablets([tablet_01, tablet_02])
+#
+# # insert one tablet with empty cells into the database.
+# values_ = [
+#     [None, 10, 11, 1.1, 10011.1, "test01"],
+#     [True, None, 11111, 1.25, 101.0, "test02"],
+#     [False, 100, 1, None, 688.25, "test03"],
+#     [True, 0, 0, 0, 6.25, None],
+# ]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [16, 17, 18, 19]
+# tablet_ = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+# ]
+# data_types_list = [
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+# ]
+# values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+#
+# session.insert_records_of_one_device(
+#     "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
+# )
+#
+# # execute non-query sql statement
+# session.execute_non_query_statement(
+#     "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # execute sql query statement
+# with session.execute_query_statement(
+#     "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+# # execute sql query statement
+# with session.execute_query_statement(
+#     "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# # execute statement
+# with session.execute_statement(
+#     "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# session.execute_statement(
+#     "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # insert string records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+# ]
+# values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
+#
+# session.insert_string_records_of_one_device(
+#     "root.sg_test_01.d_03",
+#     time_list,
+#     measurements_list,
+#     values_list,
+# )
+#
+# with session.execute_raw_data_query(
+#     ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# with session.execute_last_data_query(
+#     ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# # delete storage group
+# session.delete_storage_group("root.sg_test_01")
+#
+# # create measurement node template
+# template = Template(name="template_python", share_time=False)
+# m_node_1 = MeasurementNode(
+#     name="s1",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# m_node_2 = MeasurementNode(
+#     name="s2",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# m_node_3 = MeasurementNode(
+#     name="s3",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# template.add_template(m_node_1)
+# template.add_template(m_node_2)
+# template.add_template(m_node_3)
+# session.create_schema_template(template)
+# print("create template success template_python")
+#
+# # create internal node template
+# template_name = "treeTemplate_python"
+# template = Template(name=template_name, share_time=True)
+# i_node_gps = InternalNode(name="GPS", share_time=False)
+# i_node_v = InternalNode(name="vehicle", share_time=True)
+# m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
+#
+# i_node_gps.add_child(m_node_x)
+# i_node_v.add_child(m_node_x)
+# template.add_template(i_node_gps)
+# template.add_template(i_node_v)
+# template.add_template(m_node_x)
+#
+# session.create_schema_template(template)
+# print("create template success treeTemplate_python}")
+#
+# print(session.is_measurement_in_template(template_name, "GPS"))
+# print(session.is_measurement_in_template(template_name, "GPS.x"))
+# print(session.show_all_templates())
+#
+# # # append schema template
+# data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
+# encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
+# compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
+#
+# measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
+# session.add_measurements_in_template(
+#     template_name,
+#     measurements_aligned_path,
+#     data_types,
+#     encoding_list,
+#     compressor_list,
+#     is_aligned=True,
+# )
+# # session.drop_schema_template("add_template_python")
+# measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
+# session.add_measurements_in_template(
+#     template_name,
+#     measurements_aligned_path,
+#     data_types,
+#     encoding_list,
+#     compressor_list,
+#     is_aligned=False,
+# )
+# session.delete_node_in_template(template_name, "aligned.s1")
+# print(session.count_measurements_in_template(template_name))
+# print(session.is_path_exist_in_template(template_name, "aligned.s1"))
+# print(session.is_path_exist_in_template(template_name, "aligned.s2"))
+#
+# session.set_schema_template(template_name, "root.python.set")
+# print(session.show_paths_template_using_on(template_name))
+# print(session.show_paths_template_set_on(template_name))
+# session.unset_schema_template(template_name, "root.python.set")
+#
+# # drop template
+# session.drop_schema_template("template_python")
+# session.drop_schema_template(template_name)
+# print("drop template success, template_python and treeTemplate_python")
 
-# drop template
-session.drop_schema_template("template_python")
-session.drop_schema_template(template_name)
-print("drop template success, template_python and treeTemplate_python")
 # close session connection.
 session.close()
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 471dc98b62..64be51e9ac 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -18,6 +18,8 @@
 import logging
 import struct
 import time
+
+import numpy as np
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
@@ -53,6 +55,8 @@ from .thrift.rpc.ttypes import (
     TSRawDataQueryReq,
     TSLastDataQueryReq,
     TSInsertStringRecordsOfOneDeviceReq,
+    TGroupByTimeParameter,
+    TSFetchWindowBatchReq,
 )
 # for debug
 # from IoTDBConstants import *
@@ -78,13 +82,13 @@ class Session(object):
     DEFAULT_ZONE_ID = time.strftime("%z")
 
     def __init__(
-        self,
-        host,
-        port,
-        user=DEFAULT_USER,
-        password=DEFAULT_PASSWORD,
-        fetch_size=DEFAULT_FETCH_SIZE,
-        zone_id=DEFAULT_ZONE_ID,
+            self,
+            host,
+            port,
+            user=DEFAULT_USER,
+            password=DEFAULT_PASSWORD,
+            fetch_size=DEFAULT_FETCH_SIZE,
+            zone_id=DEFAULT_ZONE_ID,
     ):
         self.__host = host
         self.__port = port
@@ -206,15 +210,15 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_time_series(
-        self,
-        ts_path,
-        data_type,
-        encoding,
-        compressor,
-        props=None,
-        tags=None,
-        attributes=None,
-        alias=None,
+            self,
+            ts_path,
+            data_type,
+            encoding,
+            compressor,
+            props=None,
+            tags=None,
+            attributes=None,
+            alias=None,
     ):
         """
         create single time series
@@ -249,7 +253,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_aligned_time_series(
-        self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+            self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
     ):
         """
         create aligned time series
@@ -281,15 +285,15 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_multi_time_series(
-        self,
-        ts_path_lst,
-        data_type_lst,
-        encoding_lst,
-        compressor_lst,
-        props_lst=None,
-        tags_lst=None,
-        attributes_lst=None,
-        alias_lst=None,
+            self,
+            ts_path_lst,
+            data_type_lst,
+            encoding_lst,
+            compressor_lst,
+            props_lst=None,
+            tags_lst=None,
+            attributes_lst=None,
+            alias_lst=None,
     ):
         """
         create multiple time series
@@ -386,7 +390,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_str_record(
-        self, device_id, timestamp, measurements, string_values
+            self, device_id, timestamp, measurements, string_values
     ):
         """special case for inserting one row of String (TEXT) value"""
         if type(string_values) == str:
@@ -432,7 +436,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -460,7 +464,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_record(
-        self, device_id, timestamp, measurements, data_types, values
+            self, device_id, timestamp, measurements, data_types, values
     ):
         """
         insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
@@ -487,7 +491,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
@@ -515,7 +519,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def test_insert_record(
-        self, device_id, timestamp, measurements, data_types, values
+            self, device_id, timestamp, measurements, data_types, values
     ):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -540,7 +544,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def test_insert_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -566,7 +570,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+            self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -583,7 +587,7 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+            self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -594,19 +598,19 @@ class Session(object):
         )
 
     def gen_insert_records_req(
-        self,
-        device_ids,
-        times,
-        measurements_lst,
-        types_lst,
-        values_lst,
-        is_aligned=False,
+            self,
+            device_ids,
+            times,
+            measurements_lst,
+            types_lst,
+            values_lst,
+            is_aligned=False,
     ):
         if (
-            (len(device_ids) != len(measurements_lst))
-            or (len(times) != len(types_lst))
-            or (len(device_ids) != len(times))
-            or (len(times) != len(values_lst))
+                (len(device_ids) != len(measurements_lst))
+                or (len(times) != len(types_lst))
+                or (len(device_ids) != len(times))
+                or (len(times) != len(values_lst))
         ):
             raise RuntimeError(
                 "deviceIds, times, measurementsList and valuesList's size should be equal"
@@ -614,7 +618,7 @@ class Session(object):
 
         value_lst = []
         for values, data_types, measurements in zip(
-            values_lst, types_lst, measurements_lst
+                values_lst, types_lst, measurements_lst
         ):
             if (len(values) != len(data_types)) or (len(values) != len(measurements)):
                 raise RuntimeError(
@@ -697,7 +701,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_records_of_one_device(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         # sort by timestamp
         sorted_zipped = sorted(
@@ -713,7 +717,7 @@ class Session(object):
         )
 
     def insert_records_of_one_device_sorted(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         """
         Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
@@ -730,9 +734,9 @@ class Session(object):
         # check parameter
         size = len(times_list)
         if (
-            size != len(measurements_list)
-            or size != len(types_list)
-            or size != len(values_list)
+                size != len(measurements_list)
+                or size != len(types_list)
+                or size != len(values_list)
         ):
             raise RuntimeError(
                 "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -755,7 +759,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_records_of_one_device(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         # sort by timestamp
         sorted_zipped = sorted(
@@ -771,7 +775,7 @@ class Session(object):
         )
 
     def insert_aligned_records_of_one_device_sorted(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         """
         Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
@@ -787,9 +791,9 @@ class Session(object):
         # check parameter
         size = len(times_list)
         if (
-            size != len(measurements_list)
-            or size != len(types_list)
-            or size != len(values_list)
+                size != len(measurements_list)
+                or size != len(types_list)
+                or size != len(values_list)
         ):
             raise RuntimeError(
                 "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -812,17 +816,17 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_records_of_one_device_request(
-        self,
-        device_id,
-        times_list,
-        measurements_list,
-        values_list,
-        types_list,
-        is_aligned=False,
+            self,
+            device_id,
+            times_list,
+            measurements_list,
+            values_list,
+            types_list,
+            is_aligned=False,
     ):
         binary_value_list = []
         for values, data_types, measurements in zip(
-            values_list, types_list, measurements_list
+                values_list, types_list, measurements_list
         ):
             data_types = [data_type.value for data_type in data_types]
             if (len(values) != len(data_types)) or (len(values) != len(measurements)):
@@ -1037,7 +1041,7 @@ class Session(object):
         return -1
 
     def execute_raw_data_query(
-        self, paths: list, start_time: int, end_time: int
+            self, paths: list, start_time: int, end_time: int
     ) -> SessionDataSet:
         """
         execute query statement and returns SessionDataSet
@@ -1100,12 +1104,12 @@ class Session(object):
         )
 
     def insert_string_records_of_one_device(
-        self,
-        device_id: str,
-        times: list,
-        measurements_list: list,
-        values_list: list,
-        have_sorted: bool = False,
+            self,
+            device_id: str,
+            times: list,
+            measurements_list: list,
+            values_list: list,
+            have_sorted: bool = False,
     ):
         """
         insert multiple row of string record into database:
@@ -1132,12 +1136,12 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_string_records_of_one_device(
-        self,
-        device_id: str,
-        times: list,
-        measurements_list: list,
-        values: list,
-        have_sorted: bool = False,
+            self,
+            device_id: str,
+            times: list,
+            measurements_list: list,
+            values: list,
+            have_sorted: bool = False,
     ):
         if (len(times) != len(measurements_list)) or (len(times) != len(values)):
             raise RuntimeError(
@@ -1154,13 +1158,13 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_string_records_of_one_device_request(
-        self,
-        device_id,
-        times,
-        measurements_list,
-        values_list,
-        have_sorted,
-        is_aligned=False,
+            self,
+            device_id,
+            times,
+            measurements_list,
+            values_list,
+            have_sorted,
+            is_aligned=False,
     ):
         if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
             raise RuntimeError(
@@ -1244,13 +1248,13 @@ class Session(object):
             raise RuntimeError("execution of statement fails because: ", e)
 
     def add_measurements_in_template(
-        self,
-        template_name: str,
-        measurements_path: list,
-        data_types: list,
-        encodings: list,
-        compressors: list,
-        is_aligned: bool = False,
+            self,
+            template_name: str,
+            measurements_path: list,
+            data_types: list,
+            encodings: list,
+            compressors: list,
+            is_aligned: bool = False,
     ):
         """
         add measurements in the template, the template must already create. This function adds some measurements node.
@@ -1452,10 +1456,38 @@ class Session(object):
         )
         return response.measurements
 
-    def fetch_window_batch(self, query_paths : list, function_name : str, fetch_args):
+    def fetch_window_batch(self, query_paths: list, function_name: str, fetch_args):
         request = TSFetchWindowBatchReq(
-            self.__session_id,query_paths,function_name
-            fetch_args.
+            self.__session_id,
+            self.__statement_id,
+            query_paths,
+            function_name,
+            TGroupByTimeParameter(fetch_args["start_time"], fetch_args["end_time"], fetch_args["interval"],
+                                  fetch_args["sliding_step"],
+                                  fetch_args["indexes"])
         )
-        response = self.__client.fetch_window_batch(request)
-        return
\ No newline at end of file
+        try:
+            resp = self.__client.fetchWindowBatch(request)
+            status = resp.status
+
+            if Session.verify_success(status) == 0:
+                window_batch = []
+                for window_result_set in resp.windowBatchDataSetList:
+                    window_session_data_set = SessionDataSet.init_from_window(
+                        resp.columnNameList,
+                        resp.columnTypeList,
+                        resp.columnNameIndexMap,
+                        self.__statement_id,
+                        self.__session_id,
+                        window_result_set
+                    )
+
+                    window_df = window_session_data_set.to_df(window_session_data_set)
+                    window_batch.append(window_df)
+                return np.array(window_batch)
+            else:
+                raise RuntimeError(
+                    "execution of fetch window batch fails because: {}", status.message
+                )
+        except TTransport.TException as e:
+            raise RuntimeError("execution of fetch window batch fails because: ", e)
diff --git a/client-py/iotdb/dbapi/Cursor.py b/client-py/iotdb/dbapi/Cursor.py
index a1d6e2caab..a6a4e24e7d 100644
--- a/client-py/iotdb/dbapi/Cursor.py
+++ b/client-py/iotdb/dbapi/Cursor.py
@@ -136,7 +136,7 @@ class Cursor(object):
             rows = []
 
             if data_set:
-                data = data_set.todf()
+                data = data_set.to_df()
 
                 if self.__sqlalchemy_mode and time_index:
                     time_column = data.columns[0]
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index de0fe7728c..99417b0b80 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -48,6 +48,7 @@ class IoTDBRpcDataSet(object):
         session_id,
         query_data_set,
         fetch_size,
+        is_rpc_fetch_result
     ):
         self.__statement_id = statement_id
         self.__session_id = session_id
@@ -58,6 +59,7 @@ class IoTDBRpcDataSet(object):
         self.__fetch_size = fetch_size
         self.__column_size = len(column_name_list)
         self.__default_time_out = 1000
+        self.__is_rpc_fetch_result = is_rpc_fetch_result
 
         self.__column_name_list = []
         self.__column_type_list = []
@@ -137,7 +139,7 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__is_rpc_fetch_result and self.fetch_results():
             self.construct_one_row()
             return True
         return False
@@ -152,14 +154,14 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__is_rpc_fetch_result and self.fetch_results():
             return True
         return False
 
     def _to_bitstring(self, b):
         return "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b))
 
-    def resultset_to_pandas(self):
+    def resultset_to_numpy(self):
         result = {}
         for column_name in self.__column_name_list:
             result[column_name] = None
@@ -278,9 +280,10 @@ class IoTDBRpcDataSet(object):
         for k, v in result.items():
             if v is None:
                 result[k] = []
+        return result
 
-        df = pd.DataFrame(result)
-        return df
+    def resultset_to_pandas(self):
+        return pd.DataFrame(self.resultset_to_numpy())
 
     def construct_one_row(self):
         # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
diff --git a/client-py/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py
index 02eef027df..b9c08de453 100644
--- a/client-py/iotdb/utils/SessionDataSet.py
+++ b/client-py/iotdb/utils/SessionDataSet.py
@@ -32,17 +32,17 @@ logger = logging.getLogger("IoTDB")
 
 class SessionDataSet(object):
     def __init__(
-        self,
-        sql,
-        column_name_list,
-        column_type_list,
-        column_name_index,
-        query_id,
-        client,
-        statement_id,
-        session_id,
-        query_data_set,
-        ignore_timestamp,
+            self,
+            sql,
+            column_name_list,
+            column_type_list,
+            column_name_index,
+            query_id,
+            client,
+            statement_id,
+            session_id,
+            query_data_set,
+            ignore_timestamp,
     ):
         self.iotdb_rpc_data_set = IoTDBRpcDataSet(
             sql,
@@ -56,7 +56,32 @@ class SessionDataSet(object):
             session_id,
             query_data_set,
             1024,
+            True
+        )
+
+    @classmethod
+    def init_from_window(self, column_name_list,
+                         column_type_list,
+                         column_name_index,
+                         statement_id,
+                         session_id,
+                         query_data_set
+                         ):
+        self.iotdb_rpc_data_set = IoTDBRpcDataSet(
+            "",
+            column_name_list,
+            column_type_list,
+            column_name_index,
+            False,
+            -1,
+            None,
+            statement_id,
+            session_id,
+            query_data_set,
+            1024,
+            False
         )
+        return self
 
     def __enter__(self):
         return self
@@ -96,8 +121,8 @@ class SessionDataSet(object):
                 data_set_column_index -= 1
             column_name = self.iotdb_rpc_data_set.get_column_names()[index]
             location = (
-                self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
-                - IoTDBRpcDataSet.START_INDEX
+                    self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
+                    - IoTDBRpcDataSet.START_INDEX
             )
 
             if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
@@ -136,9 +161,12 @@ class SessionDataSet(object):
     def close_operation_handle(self):
         self.iotdb_rpc_data_set.close()
 
-    def todf(self):
+    def to_df(self):
         return resultset_to_pandas(self)
 
+    def to_numpy(self):
+        return resultset_to_numpy(self)
+
 
 def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
     """
@@ -150,6 +178,16 @@ def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
     return result_set.iotdb_rpc_data_set.resultset_to_pandas()
 
 
+def resultset_to_numpy(result_set: SessionDataSet):
+    """
+    Transforms a SessionDataSet from IoTDB to a Numpy array
+    Each Field from IoTDB is a column
+    :param result_set:
+    :return:
+    """
+    return result_set.iotdb_rpc_data_set.resultset_to_numpy()
+
+
 def get_typed_point(field: Field, none_value=None):
     choices = {
         # In Case of Boolean, cast to 0 / 1
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index c7cce58ea5..0c59d6127f 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -34,7 +34,7 @@ def test_simple_query():
 
         # Read
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df = session_data_set.todf()
+        df = session_data_set.to_df()
 
         session.close()
 
@@ -54,7 +54,7 @@ def test_non_time_query():
 
         # Read
         session_data_set = session.execute_query_statement("SHOW TIMESERIES")
-        df = session_data_set.todf()
+        df = session_data_set.to_df()
 
         session.close()
 
diff --git a/client-py/tests/test_tablet.py b/client-py/tests/test_tablet.py
index 1e80277d77..a3fb8dac6d 100644
--- a/client-py/tests/test_tablet.py
+++ b/client-py/tests/test_tablet.py
@@ -61,7 +61,7 @@ def test_tablet_insertion():
         session_data_set = session.execute_query_statement(
             "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
         )
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -104,7 +104,7 @@ def test_nullable_tablet_insertion():
         session_data_set = session.execute_query_statement(
             "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
         )
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
diff --git a/client-py/tests/test_todf.py b/client-py/tests/test_todf.py
index 07953446cf..03fe2e76a5 100644
--- a/client-py/tests/test_todf.py
+++ b/client-py/tests/test_todf.py
@@ -94,7 +94,7 @@ def test_simple_query():
         df_input.insert(0, "Time", timestamps)
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -174,7 +174,7 @@ def test_with_null_query():
         df_input.insert(0, "Time", timestamps)
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -212,7 +212,7 @@ def test_multi_fetch():
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
         session_data_set.set_fetch_size(100)
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()