You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 12:48:55 UTC
[iotdb] 01/03: finish python session
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9fd48778854f9c41077a3d1047e592a2d8469f91
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 14 20:04:29 2022 +0800
finish python session
---
client-py/README.md | 2 +-
client-py/SessionExample.py | 762 ++++++++++++++++---------------
client-py/iotdb/Session.py | 224 +++++----
client-py/iotdb/dbapi/Cursor.py | 2 +-
client-py/iotdb/utils/IoTDBRpcDataSet.py | 13 +-
client-py/iotdb/utils/SessionDataSet.py | 66 ++-
client-py/tests/test_dataframe.py | 4 +-
client-py/tests/test_tablet.py | 4 +-
client-py/tests/test_todf.py | 6 +-
9 files changed, 586 insertions(+), 497 deletions(-)
diff --git a/client-py/README.md b/client-py/README.md
index 3197cc7186..c124b32881 100644
--- a/client-py/README.md
+++ b/client-py/README.md
@@ -386,7 +386,7 @@ session.open(False)
result = session.execute_query_statement("SELECT * FROM root.*")
# Transform to Pandas Dataset
-df = result.todf()
+df = result.to_df()
session.close()
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 61e82234db..4f04855def 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -35,382 +35,398 @@ password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
-# set and delete storage groups
-session.set_storage_group("root.sg_test_01")
-session.set_storage_group("root.sg_test_02")
-session.set_storage_group("root.sg_test_03")
-session.set_storage_group("root.sg_test_04")
-session.delete_storage_group("root.sg_test_02")
-session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
-
-# setting time series.
-session.create_time_series(
- "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
- "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
- "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
- "root.sg_test_01.d_02.s_01",
- TSDataType.BOOLEAN,
- TSEncoding.PLAIN,
- Compressor.SNAPPY,
- None,
- {"tag1": "v1"},
- {"description": "v1"},
- "temperature",
-)
-
-# setting multiple time series once.
-ts_path_lst_ = [
- "root.sg_test_01.d_01.s_04",
- "root.sg_test_01.d_01.s_05",
- "root.sg_test_01.d_01.s_06",
- "root.sg_test_01.d_01.s_07",
- "root.sg_test_01.d_01.s_08",
- "root.sg_test_01.d_01.s_09",
-]
-data_type_lst_ = [
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.TEXT,
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
- ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
-)
-
-ts_path_lst_ = [
- "root.sg_test_01.d_02.s_04",
- "root.sg_test_01.d_02.s_05",
- "root.sg_test_01.d_02.s_06",
- "root.sg_test_01.d_02.s_07",
- "root.sg_test_01.d_02.s_08",
- "root.sg_test_01.d_02.s_09",
-]
-data_type_lst_ = [
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.TEXT,
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
-attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
- ts_path_lst_,
- data_type_lst_,
- encoding_lst_,
- compressor_lst_,
- None,
- tags_lst_,
- attributes_lst_,
- None,
-)
-
-# delete time series
-session.delete_time_series(
- [
- "root.sg_test_01.d_01.s_07",
- "root.sg_test_01.d_01.s_08",
- "root.sg_test_01.d_01.s_09",
- ]
-)
-
-# checking time series
-print(
- "s_07 expecting False, checking result: ",
- session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
-)
-print(
- "s_03 expecting True, checking result: ",
- session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
-)
-print(
- "d_02.s_01 expecting True, checking result: ",
- session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
-)
-print(
- "d_02.s_06 expecting True, checking result: ",
- session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
-)
-
-# insert one record into the database.
-measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
-values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
-data_types_ = [
- TSDataType.BOOLEAN,
- TSDataType.INT32,
- TSDataType.INT64,
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.TEXT,
+ts_path_list = [
+ "root.sg1.d1.s1",
+ "root.sg1.d1.s2"
]
-session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
-
-# insert multiple records into database
-measurements_list_ = [
- ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
- ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
-]
-values_list_ = [
- [False, 22, 33, 4.4, 55.1, "test_records01"],
- [True, 77, 88, 1.25, 8.125, "test_records02"],
-]
-data_type_list_ = [data_types_, data_types_]
-device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
-session.insert_records(
- device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
-)
-
-# insert one tablet into the database.
-values_ = [
- [False, 10, 11, 1.1, 10011.1, "test01"],
- [True, 100, 11111, 1.25, 101.0, "test02"],
- [False, 100, 1, 188.1, 688.25, "test03"],
- [True, 0, 0, 0, 6.25, "test04"],
-] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [4, 5, 6, 7]
-tablet_ = Tablet(
- "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert one numpy tablet into the database.
-np_values_ = [
- np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
- np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
- np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
- np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
- np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
- np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
-]
-np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
-np_tablet_ = NumpyTablet(
- "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
-)
-session.insert_tablet(np_tablet_)
-
-# insert one unsorted numpy tablet into the database.
-np_values_unsorted = [
- np.array([False, False, False, True, True], np.dtype(">?")),
- np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
- np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
- np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
- np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
- np.array(["test09", "test08", "test07", "test06", "test05"]),
-]
-np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
-np_tablet_unsorted = NumpyTablet(
- "root.sg_test_01.d_02",
- measurements_,
- data_types_,
- np_values_unsorted,
- np_timestamps_unsorted,
-)
-session.insert_tablet(np_tablet_unsorted)
-print(np_tablet_unsorted.get_timestamps())
-for value in np_tablet_unsorted.get_values():
- print(value)
-
-# insert multiple tablets into database
-tablet_01 = Tablet(
- "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
-)
-tablet_02 = Tablet(
- "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
-)
-session.insert_tablets([tablet_01, tablet_02])
-
-# insert one tablet with empty cells into the database.
-values_ = [
- [None, 10, 11, 1.1, 10011.1, "test01"],
- [True, None, 11111, 1.25, 101.0, "test02"],
- [False, 100, 1, None, 688.25, "test03"],
- [True, 0, 0, 0, 6.25, None],
-] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [16, 17, 18, 19]
-tablet_ = Tablet(
- "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert records of one device
-time_list = [1, 2, 3]
-measurements_list = [
- ["s_01", "s_02", "s_03"],
- ["s_01", "s_02", "s_03"],
- ["s_01", "s_02", "s_03"],
-]
-data_types_list = [
- [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
- [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
- [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-]
-values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
-
-session.insert_records_of_one_device(
- "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
-)
-
-# execute non-query sql statement
-session.execute_non_query_statement(
- "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# execute sql query statement
-with session.execute_query_statement(
- "select * from root.sg_test_01.d_01"
-) as session_data_set:
- session_data_set.set_fetch_size(1024)
- while session_data_set.has_next():
- print(session_data_set.next())
-# execute sql query statement
-with session.execute_query_statement(
- "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
-) as session_data_set:
- session_data_set.set_fetch_size(1024)
- while session_data_set.has_next():
- print(session_data_set.next())
-
-# execute statement
-with session.execute_statement(
- "select * from root.sg_test_01.d_01"
-) as session_data_set:
- while session_data_set.has_next():
- print(session_data_set.next())
-
-session.execute_statement(
- "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# insert string records of one device
-time_list = [1, 2, 3]
-measurements_list = [
- ["s_01", "s_02", "s_03"],
- ["s_01", "s_02", "s_03"],
- ["s_01", "s_02", "s_03"],
-]
-values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
-
-session.insert_string_records_of_one_device(
- "root.sg_test_01.d_03",
- time_list,
- measurements_list,
- values_list,
-)
-
-with session.execute_raw_data_query(
- ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
-) as session_data_set:
- session_data_set.set_fetch_size(1024)
- while session_data_set.has_next():
- print(session_data_set.next())
-
-with session.execute_last_data_query(
- ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
-) as session_data_set:
- session_data_set.set_fetch_size(1024)
- while session_data_set.has_next():
- print(session_data_set.next())
-
-# delete storage group
-session.delete_storage_group("root.sg_test_01")
-
-# create measurement node template
-template = Template(name="template_python", share_time=False)
-m_node_1 = MeasurementNode(
- name="s1",
- data_type=TSDataType.INT64,
- encoding=TSEncoding.RLE,
- compression_type=Compressor.SNAPPY,
-)
-m_node_2 = MeasurementNode(
- name="s2",
- data_type=TSDataType.INT64,
- encoding=TSEncoding.RLE,
- compression_type=Compressor.SNAPPY,
-)
-m_node_3 = MeasurementNode(
- name="s3",
- data_type=TSDataType.INT64,
- encoding=TSEncoding.RLE,
- compression_type=Compressor.SNAPPY,
-)
-template.add_template(m_node_1)
-template.add_template(m_node_2)
-template.add_template(m_node_3)
-session.create_schema_template(template)
-print("create template success template_python")
-
-# create internal node template
-template_name = "treeTemplate_python"
-template = Template(name=template_name, share_time=True)
-i_node_gps = InternalNode(name="GPS", share_time=False)
-i_node_v = InternalNode(name="vehicle", share_time=True)
-m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
-
-i_node_gps.add_child(m_node_x)
-i_node_v.add_child(m_node_x)
-template.add_template(i_node_gps)
-template.add_template(i_node_v)
-template.add_template(m_node_x)
-session.create_schema_template(template)
-print("create template success treeTemplate_python}")
-
-print(session.is_measurement_in_template(template_name, "GPS"))
-print(session.is_measurement_in_template(template_name, "GPS.x"))
-print(session.show_all_templates())
-
-# # append schema template
-data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
-encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
-compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
-
-measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
-session.add_measurements_in_template(
- template_name,
- measurements_aligned_path,
- data_types,
- encoding_list,
- compressor_list,
- is_aligned=True,
-)
-# session.drop_schema_template("add_template_python")
-measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
-session.add_measurements_in_template(
- template_name,
- measurements_aligned_path,
- data_types,
- encoding_list,
- compressor_list,
- is_aligned=False,
-)
-session.delete_node_in_template(template_name, "aligned.s1")
-print(session.count_measurements_in_template(template_name))
-print(session.is_path_exist_in_template(template_name, "aligned.s1"))
-print(session.is_path_exist_in_template(template_name, "aligned.s2"))
-
-session.set_schema_template(template_name, "root.python.set")
-print(session.show_paths_template_using_on(template_name))
-print(session.show_paths_template_set_on(template_name))
-session.unset_schema_template(template_name, "root.python.set")
+fetch_args = {
+ "start_time": 0,
+ "end_time": 32,
+ "interval": 4,
+ "sliding_step": 1,
+ "indexes": [0, 3, 5, 9]
+}
+
+print(session.fetch_window_batch(ts_path_list, None, fetch_args))
+
+# # set and delete storage groups
+# session.set_storage_group("root.sg_test_01")
+# session.set_storage_group("root.sg_test_02")
+# session.set_storage_group("root.sg_test_03")
+# session.set_storage_group("root.sg_test_04")
+# session.delete_storage_group("root.sg_test_02")
+# session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+#
+# # setting time series.
+# session.create_time_series(
+# "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+# "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+# "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+# "root.sg_test_01.d_02.s_01",
+# TSDataType.BOOLEAN,
+# TSEncoding.PLAIN,
+# Compressor.SNAPPY,
+# None,
+# {"tag1": "v1"},
+# {"description": "v1"},
+# "temperature",
+# )
+#
+# # setting multiple time series once.
+# ts_path_lst_ = [
+# "root.sg_test_01.d_01.s_04",
+# "root.sg_test_01.d_01.s_05",
+# "root.sg_test_01.d_01.s_06",
+# "root.sg_test_01.d_01.s_07",
+# "root.sg_test_01.d_01.s_08",
+# "root.sg_test_01.d_01.s_09",
+# ]
+# data_type_lst_ = [
+# TSDataType.FLOAT,
+# TSDataType.DOUBLE,
+# TSDataType.TEXT,
+# TSDataType.FLOAT,
+# TSDataType.DOUBLE,
+# TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+# ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+# )
+#
+# ts_path_lst_ = [
+# "root.sg_test_01.d_02.s_04",
+# "root.sg_test_01.d_02.s_05",
+# "root.sg_test_01.d_02.s_06",
+# "root.sg_test_01.d_02.s_07",
+# "root.sg_test_01.d_02.s_08",
+# "root.sg_test_01.d_02.s_09",
+# ]
+# data_type_lst_ = [
+# TSDataType.FLOAT,
+# TSDataType.DOUBLE,
+# TSDataType.TEXT,
+# TSDataType.FLOAT,
+# TSDataType.DOUBLE,
+# TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
+# attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+# ts_path_lst_,
+# data_type_lst_,
+# encoding_lst_,
+# compressor_lst_,
+# None,
+# tags_lst_,
+# attributes_lst_,
+# None,
+# )
+#
+# # delete time series
+# session.delete_time_series(
+# [
+# "root.sg_test_01.d_01.s_07",
+# "root.sg_test_01.d_01.s_08",
+# "root.sg_test_01.d_01.s_09",
+# ]
+# )
+#
+# # checking time series
+# print(
+# "s_07 expecting False, checking result: ",
+# session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
+# )
+# print(
+# "s_03 expecting True, checking result: ",
+# session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
+# )
+# print(
+# "d_02.s_01 expecting True, checking result: ",
+# session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
+# )
+# print(
+# "d_02.s_06 expecting True, checking result: ",
+# session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
+# )
+#
+# # insert one record into the database.
+# measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+# values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+# data_types_ = [
+# TSDataType.BOOLEAN,
+# TSDataType.INT32,
+# TSDataType.INT64,
+# TSDataType.FLOAT,
+# TSDataType.DOUBLE,
+# TSDataType.TEXT,
+# ]
+# session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
+#
+# # insert multiple records into database
+# measurements_list_ = [
+# ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+# ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+# ]
+# values_list_ = [
+# [False, 22, 33, 4.4, 55.1, "test_records01"],
+# [True, 77, 88, 1.25, 8.125, "test_records02"],
+# ]
+# data_type_list_ = [data_types_, data_types_]
+# device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+# session.insert_records(
+# device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+# )
+#
+# # insert one tablet into the database.
+# values_ = [
+# [False, 10, 11, 1.1, 10011.1, "test01"],
+# [True, 100, 11111, 1.25, 101.0, "test02"],
+# [False, 100, 1, 188.1, 688.25, "test03"],
+# [True, 0, 0, 0, 6.25, "test04"],
+# ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [4, 5, 6, 7]
+# tablet_ = Tablet(
+# "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert one numpy tablet into the database.
+# np_values_ = [
+# np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
+# np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
+# np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
+# np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
+# np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
+# np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
+# ]
+# np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
+# np_tablet_ = NumpyTablet(
+# "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+# )
+# session.insert_tablet(np_tablet_)
+#
+# # insert one unsorted numpy tablet into the database.
+# np_values_unsorted = [
+# np.array([False, False, False, True, True], np.dtype(">?")),
+# np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
+# np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
+# np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
+# np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
+# np.array(["test09", "test08", "test07", "test06", "test05"]),
+# ]
+# np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
+# np_tablet_unsorted = NumpyTablet(
+# "root.sg_test_01.d_02",
+# measurements_,
+# data_types_,
+# np_values_unsorted,
+# np_timestamps_unsorted,
+# )
+# session.insert_tablet(np_tablet_unsorted)
+# print(np_tablet_unsorted.get_timestamps())
+# for value in np_tablet_unsorted.get_values():
+# print(value)
+#
+# # insert multiple tablets into database
+# tablet_01 = Tablet(
+# "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
+# )
+# tablet_02 = Tablet(
+# "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
+# )
+# session.insert_tablets([tablet_01, tablet_02])
+#
+# # insert one tablet with empty cells into the database.
+# values_ = [
+# [None, 10, 11, 1.1, 10011.1, "test01"],
+# [True, None, 11111, 1.25, 101.0, "test02"],
+# [False, 100, 1, None, 688.25, "test03"],
+# [True, 0, 0, 0, 6.25, None],
+# ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [16, 17, 18, 19]
+# tablet_ = Tablet(
+# "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+# ["s_01", "s_02", "s_03"],
+# ["s_01", "s_02", "s_03"],
+# ["s_01", "s_02", "s_03"],
+# ]
+# data_types_list = [
+# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+# [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+# ]
+# values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+#
+# session.insert_records_of_one_device(
+# "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
+# )
+#
+# # execute non-query sql statement
+# session.execute_non_query_statement(
+# "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # execute sql query statement
+# with session.execute_query_statement(
+# "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+# session_data_set.set_fetch_size(1024)
+# while session_data_set.has_next():
+# print(session_data_set.next())
+# # execute sql query statement
+# with session.execute_query_statement(
+# "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
+# ) as session_data_set:
+# session_data_set.set_fetch_size(1024)
+# while session_data_set.has_next():
+# print(session_data_set.next())
+#
+# # execute statement
+# with session.execute_statement(
+# "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+# while session_data_set.has_next():
+# print(session_data_set.next())
+#
+# session.execute_statement(
+# "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # insert string records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+# ["s_01", "s_02", "s_03"],
+# ["s_01", "s_02", "s_03"],
+# ["s_01", "s_02", "s_03"],
+# ]
+# values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
+#
+# session.insert_string_records_of_one_device(
+# "root.sg_test_01.d_03",
+# time_list,
+# measurements_list,
+# values_list,
+# )
+#
+# with session.execute_raw_data_query(
+# ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
+# ) as session_data_set:
+# session_data_set.set_fetch_size(1024)
+# while session_data_set.has_next():
+# print(session_data_set.next())
+#
+# with session.execute_last_data_query(
+# ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
+# ) as session_data_set:
+# session_data_set.set_fetch_size(1024)
+# while session_data_set.has_next():
+# print(session_data_set.next())
+#
+# # delete storage group
+# session.delete_storage_group("root.sg_test_01")
+#
+# # create measurement node template
+# template = Template(name="template_python", share_time=False)
+# m_node_1 = MeasurementNode(
+# name="s1",
+# data_type=TSDataType.INT64,
+# encoding=TSEncoding.RLE,
+# compression_type=Compressor.SNAPPY,
+# )
+# m_node_2 = MeasurementNode(
+# name="s2",
+# data_type=TSDataType.INT64,
+# encoding=TSEncoding.RLE,
+# compression_type=Compressor.SNAPPY,
+# )
+# m_node_3 = MeasurementNode(
+# name="s3",
+# data_type=TSDataType.INT64,
+# encoding=TSEncoding.RLE,
+# compression_type=Compressor.SNAPPY,
+# )
+# template.add_template(m_node_1)
+# template.add_template(m_node_2)
+# template.add_template(m_node_3)
+# session.create_schema_template(template)
+# print("create template success template_python")
+#
+# # create internal node template
+# template_name = "treeTemplate_python"
+# template = Template(name=template_name, share_time=True)
+# i_node_gps = InternalNode(name="GPS", share_time=False)
+# i_node_v = InternalNode(name="vehicle", share_time=True)
+# m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
+#
+# i_node_gps.add_child(m_node_x)
+# i_node_v.add_child(m_node_x)
+# template.add_template(i_node_gps)
+# template.add_template(i_node_v)
+# template.add_template(m_node_x)
+#
+# session.create_schema_template(template)
+# print("create template success treeTemplate_python}")
+#
+# print(session.is_measurement_in_template(template_name, "GPS"))
+# print(session.is_measurement_in_template(template_name, "GPS.x"))
+# print(session.show_all_templates())
+#
+# # # append schema template
+# data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
+# encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
+# compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
+#
+# measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
+# session.add_measurements_in_template(
+# template_name,
+# measurements_aligned_path,
+# data_types,
+# encoding_list,
+# compressor_list,
+# is_aligned=True,
+# )
+# # session.drop_schema_template("add_template_python")
+# measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
+# session.add_measurements_in_template(
+# template_name,
+# measurements_aligned_path,
+# data_types,
+# encoding_list,
+# compressor_list,
+# is_aligned=False,
+# )
+# session.delete_node_in_template(template_name, "aligned.s1")
+# print(session.count_measurements_in_template(template_name))
+# print(session.is_path_exist_in_template(template_name, "aligned.s1"))
+# print(session.is_path_exist_in_template(template_name, "aligned.s2"))
+#
+# session.set_schema_template(template_name, "root.python.set")
+# print(session.show_paths_template_using_on(template_name))
+# print(session.show_paths_template_set_on(template_name))
+# session.unset_schema_template(template_name, "root.python.set")
+#
+# # drop template
+# session.drop_schema_template("template_python")
+# session.drop_schema_template(template_name)
+# print("drop template success, template_python and treeTemplate_python")
-# drop template
-session.drop_schema_template("template_python")
-session.drop_schema_template(template_name)
-print("drop template success, template_python and treeTemplate_python")
# close session connection.
session.close()
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 471dc98b62..64be51e9ac 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -18,6 +18,8 @@
import logging
import struct
import time
+
+import numpy as np
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TSocket, TTransport
@@ -53,6 +55,8 @@ from .thrift.rpc.ttypes import (
TSRawDataQueryReq,
TSLastDataQueryReq,
TSInsertStringRecordsOfOneDeviceReq,
+ TGroupByTimeParameter,
+ TSFetchWindowBatchReq,
)
# for debug
# from IoTDBConstants import *
@@ -78,13 +82,13 @@ class Session(object):
DEFAULT_ZONE_ID = time.strftime("%z")
def __init__(
- self,
- host,
- port,
- user=DEFAULT_USER,
- password=DEFAULT_PASSWORD,
- fetch_size=DEFAULT_FETCH_SIZE,
- zone_id=DEFAULT_ZONE_ID,
+ self,
+ host,
+ port,
+ user=DEFAULT_USER,
+ password=DEFAULT_PASSWORD,
+ fetch_size=DEFAULT_FETCH_SIZE,
+ zone_id=DEFAULT_ZONE_ID,
):
self.__host = host
self.__port = port
@@ -206,15 +210,15 @@ class Session(object):
return Session.verify_success(status)
def create_time_series(
- self,
- ts_path,
- data_type,
- encoding,
- compressor,
- props=None,
- tags=None,
- attributes=None,
- alias=None,
+ self,
+ ts_path,
+ data_type,
+ encoding,
+ compressor,
+ props=None,
+ tags=None,
+ attributes=None,
+ alias=None,
):
"""
create single time series
@@ -249,7 +253,7 @@ class Session(object):
return Session.verify_success(status)
def create_aligned_time_series(
- self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+ self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
):
"""
create aligned time series
@@ -281,15 +285,15 @@ class Session(object):
return Session.verify_success(status)
def create_multi_time_series(
- self,
- ts_path_lst,
- data_type_lst,
- encoding_lst,
- compressor_lst,
- props_lst=None,
- tags_lst=None,
- attributes_lst=None,
- alias_lst=None,
+ self,
+ ts_path_lst,
+ data_type_lst,
+ encoding_lst,
+ compressor_lst,
+ props_lst=None,
+ tags_lst=None,
+ attributes_lst=None,
+ alias_lst=None,
):
"""
create multiple time series
@@ -386,7 +390,7 @@ class Session(object):
return Session.verify_success(status)
def insert_aligned_str_record(
- self, device_id, timestamp, measurements, string_values
+ self, device_id, timestamp, measurements, string_values
):
"""special case for inserting one row of String (TEXT) value"""
if type(string_values) == str:
@@ -432,7 +436,7 @@ class Session(object):
return Session.verify_success(status)
def insert_records(
- self, device_ids, times, measurements_lst, types_lst, values_lst
+ self, device_ids, times, measurements_lst, types_lst, values_lst
):
"""
insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -460,7 +464,7 @@ class Session(object):
return Session.verify_success(status)
def insert_aligned_record(
- self, device_id, timestamp, measurements, data_types, values
+ self, device_id, timestamp, measurements, data_types, values
):
"""
insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
@@ -487,7 +491,7 @@ class Session(object):
return Session.verify_success(status)
def insert_aligned_records(
- self, device_ids, times, measurements_lst, types_lst, values_lst
+ self, device_ids, times, measurements_lst, types_lst, values_lst
):
"""
insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
@@ -515,7 +519,7 @@ class Session(object):
return Session.verify_success(status)
def test_insert_record(
- self, device_id, timestamp, measurements, data_types, values
+ self, device_id, timestamp, measurements, data_types, values
):
"""
this method NOT insert data into database and the server just return after accept the request, this method
@@ -540,7 +544,7 @@ class Session(object):
return Session.verify_success(status)
def test_insert_records(
- self, device_ids, times, measurements_lst, types_lst, values_lst
+ self, device_ids, times, measurements_lst, types_lst, values_lst
):
"""
this method NOT insert data into database and the server just return after accept the request, this method
@@ -566,7 +570,7 @@ class Session(object):
return Session.verify_success(status)
def gen_insert_record_req(
- self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+ self, device_id, timestamp, measurements, data_types, values, is_aligned=False
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
@@ -583,7 +587,7 @@ class Session(object):
)
def gen_insert_str_record_req(
- self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+ self, device_id, timestamp, measurements, data_types, values, is_aligned=False
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
@@ -594,19 +598,19 @@ class Session(object):
)
def gen_insert_records_req(
- self,
- device_ids,
- times,
- measurements_lst,
- types_lst,
- values_lst,
- is_aligned=False,
+ self,
+ device_ids,
+ times,
+ measurements_lst,
+ types_lst,
+ values_lst,
+ is_aligned=False,
):
if (
- (len(device_ids) != len(measurements_lst))
- or (len(times) != len(types_lst))
- or (len(device_ids) != len(times))
- or (len(times) != len(values_lst))
+ (len(device_ids) != len(measurements_lst))
+ or (len(times) != len(types_lst))
+ or (len(device_ids) != len(times))
+ or (len(times) != len(values_lst))
):
raise RuntimeError(
"deviceIds, times, measurementsList and valuesList's size should be equal"
@@ -614,7 +618,7 @@ class Session(object):
value_lst = []
for values, data_types, measurements in zip(
- values_lst, types_lst, measurements_lst
+ values_lst, types_lst, measurements_lst
):
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
raise RuntimeError(
@@ -697,7 +701,7 @@ class Session(object):
return Session.verify_success(status)
def insert_records_of_one_device(
- self, device_id, times_list, measurements_list, types_list, values_list
+ self, device_id, times_list, measurements_list, types_list, values_list
):
# sort by timestamp
sorted_zipped = sorted(
@@ -713,7 +717,7 @@ class Session(object):
)
def insert_records_of_one_device_sorted(
- self, device_id, times_list, measurements_list, types_list, values_list
+ self, device_id, times_list, measurements_list, types_list, values_list
):
"""
Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
@@ -730,9 +734,9 @@ class Session(object):
# check parameter
size = len(times_list)
if (
- size != len(measurements_list)
- or size != len(types_list)
- or size != len(values_list)
+ size != len(measurements_list)
+ or size != len(types_list)
+ or size != len(values_list)
):
raise RuntimeError(
"insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -755,7 +759,7 @@ class Session(object):
return Session.verify_success(status)
def insert_aligned_records_of_one_device(
- self, device_id, times_list, measurements_list, types_list, values_list
+ self, device_id, times_list, measurements_list, types_list, values_list
):
# sort by timestamp
sorted_zipped = sorted(
@@ -771,7 +775,7 @@ class Session(object):
)
def insert_aligned_records_of_one_device_sorted(
- self, device_id, times_list, measurements_list, types_list, values_list
+ self, device_id, times_list, measurements_list, types_list, values_list
):
"""
Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
@@ -787,9 +791,9 @@ class Session(object):
# check parameter
size = len(times_list)
if (
- size != len(measurements_list)
- or size != len(types_list)
- or size != len(values_list)
+ size != len(measurements_list)
+ or size != len(types_list)
+ or size != len(values_list)
):
raise RuntimeError(
"insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -812,17 +816,17 @@ class Session(object):
return Session.verify_success(status)
def gen_insert_records_of_one_device_request(
- self,
- device_id,
- times_list,
- measurements_list,
- values_list,
- types_list,
- is_aligned=False,
+ self,
+ device_id,
+ times_list,
+ measurements_list,
+ values_list,
+ types_list,
+ is_aligned=False,
):
binary_value_list = []
for values, data_types, measurements in zip(
- values_list, types_list, measurements_list
+ values_list, types_list, measurements_list
):
data_types = [data_type.value for data_type in data_types]
if (len(values) != len(data_types)) or (len(values) != len(measurements)):
@@ -1037,7 +1041,7 @@ class Session(object):
return -1
def execute_raw_data_query(
- self, paths: list, start_time: int, end_time: int
+ self, paths: list, start_time: int, end_time: int
) -> SessionDataSet:
"""
execute query statement and returns SessionDataSet
@@ -1100,12 +1104,12 @@ class Session(object):
)
def insert_string_records_of_one_device(
- self,
- device_id: str,
- times: list,
- measurements_list: list,
- values_list: list,
- have_sorted: bool = False,
+ self,
+ device_id: str,
+ times: list,
+ measurements_list: list,
+ values_list: list,
+ have_sorted: bool = False,
):
"""
insert multiple row of string record into database:
@@ -1132,12 +1136,12 @@ class Session(object):
return Session.verify_success(status)
def insert_aligned_string_records_of_one_device(
- self,
- device_id: str,
- times: list,
- measurements_list: list,
- values: list,
- have_sorted: bool = False,
+ self,
+ device_id: str,
+ times: list,
+ measurements_list: list,
+ values: list,
+ have_sorted: bool = False,
):
if (len(times) != len(measurements_list)) or (len(times) != len(values)):
raise RuntimeError(
@@ -1154,13 +1158,13 @@ class Session(object):
return Session.verify_success(status)
def gen_insert_string_records_of_one_device_request(
- self,
- device_id,
- times,
- measurements_list,
- values_list,
- have_sorted,
- is_aligned=False,
+ self,
+ device_id,
+ times,
+ measurements_list,
+ values_list,
+ have_sorted,
+ is_aligned=False,
):
if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
raise RuntimeError(
@@ -1244,13 +1248,13 @@ class Session(object):
raise RuntimeError("execution of statement fails because: ", e)
def add_measurements_in_template(
- self,
- template_name: str,
- measurements_path: list,
- data_types: list,
- encodings: list,
- compressors: list,
- is_aligned: bool = False,
+ self,
+ template_name: str,
+ measurements_path: list,
+ data_types: list,
+ encodings: list,
+ compressors: list,
+ is_aligned: bool = False,
):
"""
add measurements in the template, the template must already create. This function adds some measurements node.
@@ -1452,10 +1456,38 @@ class Session(object):
)
return response.measurements
- def fetch_window_batch(self, query_paths : list, function_name : str, fetch_args):
+ def fetch_window_batch(self, query_paths: list, function_name: str, fetch_args):
request = TSFetchWindowBatchReq(
- self.__session_id,query_paths,function_name
- fetch_args.
+ self.__session_id,
+ self.__statement_id,
+ query_paths,
+ function_name,
+ TGroupByTimeParameter(fetch_args["start_time"], fetch_args["end_time"], fetch_args["interval"],
+ fetch_args["sliding_step"],
+ fetch_args["indexes"])
)
- response = self.__client.fetch_window_batch(request)
- return
\ No newline at end of file
+ try:
+ resp = self.__client.fetchWindowBatch(request)
+ status = resp.status
+
+ if Session.verify_success(status) == 0:
+ window_batch = []
+ for window_result_set in resp.windowBatchDataSetList:
+ window_session_data_set = SessionDataSet.init_from_window(
+ resp.columnNameList,
+ resp.columnTypeList,
+ resp.columnNameIndexMap,
+ self.__statement_id,
+ self.__session_id,
+ window_result_set
+ )
+
+ window_df = window_session_data_set.to_df(window_session_data_set)
+ window_batch.append(window_df)
+ return np.array(window_batch)
+ else:
+ raise RuntimeError(
+ "execution of fetch window batch fails because: {}", status.message
+ )
+ except TTransport.TException as e:
+ raise RuntimeError("execution of fetch window batch fails because: ", e)
diff --git a/client-py/iotdb/dbapi/Cursor.py b/client-py/iotdb/dbapi/Cursor.py
index a1d6e2caab..a6a4e24e7d 100644
--- a/client-py/iotdb/dbapi/Cursor.py
+++ b/client-py/iotdb/dbapi/Cursor.py
@@ -136,7 +136,7 @@ class Cursor(object):
rows = []
if data_set:
- data = data_set.todf()
+ data = data_set.to_df()
if self.__sqlalchemy_mode and time_index:
time_column = data.columns[0]
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index de0fe7728c..99417b0b80 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -48,6 +48,7 @@ class IoTDBRpcDataSet(object):
session_id,
query_data_set,
fetch_size,
+ is_rpc_fetch_result
):
self.__statement_id = statement_id
self.__session_id = session_id
@@ -58,6 +59,7 @@ class IoTDBRpcDataSet(object):
self.__fetch_size = fetch_size
self.__column_size = len(column_name_list)
self.__default_time_out = 1000
+ self.__is_rpc_fetch_result = is_rpc_fetch_result
self.__column_name_list = []
self.__column_type_list = []
@@ -137,7 +139,7 @@ class IoTDBRpcDataSet(object):
return True
if self.__empty_resultSet:
return False
- if self.fetch_results():
+ if self.__is_rpc_fetch_result and self.fetch_results():
self.construct_one_row()
return True
return False
@@ -152,14 +154,14 @@ class IoTDBRpcDataSet(object):
return True
if self.__empty_resultSet:
return False
- if self.fetch_results():
+ if self.__is_rpc_fetch_result and self.fetch_results():
return True
return False
def _to_bitstring(self, b):
return "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b))
- def resultset_to_pandas(self):
+ def resultset_to_numpy(self):
result = {}
for column_name in self.__column_name_list:
result[column_name] = None
@@ -278,9 +280,10 @@ class IoTDBRpcDataSet(object):
for k, v in result.items():
if v is None:
result[k] = []
+ return result
- df = pd.DataFrame(result)
- return df
+ def resultset_to_pandas(self):
+ return pd.DataFrame(self.resultset_to_numpy())
def construct_one_row(self):
# simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
diff --git a/client-py/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py
index 02eef027df..b9c08de453 100644
--- a/client-py/iotdb/utils/SessionDataSet.py
+++ b/client-py/iotdb/utils/SessionDataSet.py
@@ -32,17 +32,17 @@ logger = logging.getLogger("IoTDB")
class SessionDataSet(object):
def __init__(
- self,
- sql,
- column_name_list,
- column_type_list,
- column_name_index,
- query_id,
- client,
- statement_id,
- session_id,
- query_data_set,
- ignore_timestamp,
+ self,
+ sql,
+ column_name_list,
+ column_type_list,
+ column_name_index,
+ query_id,
+ client,
+ statement_id,
+ session_id,
+ query_data_set,
+ ignore_timestamp,
):
self.iotdb_rpc_data_set = IoTDBRpcDataSet(
sql,
@@ -56,7 +56,32 @@ class SessionDataSet(object):
session_id,
query_data_set,
1024,
+ True
+ )
+
+ @classmethod
+ def init_from_window(self, column_name_list,
+ column_type_list,
+ column_name_index,
+ statement_id,
+ session_id,
+ query_data_set
+ ):
+ self.iotdb_rpc_data_set = IoTDBRpcDataSet(
+ "",
+ column_name_list,
+ column_type_list,
+ column_name_index,
+ False,
+ -1,
+ None,
+ statement_id,
+ session_id,
+ query_data_set,
+ 1024,
+ False
)
+ return self
def __enter__(self):
return self
@@ -96,8 +121,8 @@ class SessionDataSet(object):
data_set_column_index -= 1
column_name = self.iotdb_rpc_data_set.get_column_names()[index]
location = (
- self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
- - IoTDBRpcDataSet.START_INDEX
+ self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
+ - IoTDBRpcDataSet.START_INDEX
)
if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
@@ -136,9 +161,12 @@ class SessionDataSet(object):
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
- def todf(self):
+ def to_df(self):
return resultset_to_pandas(self)
+ def to_numpy(self):
+ return resultset_to_numpy(self)
+
def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
"""
@@ -150,6 +178,16 @@ def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
return result_set.iotdb_rpc_data_set.resultset_to_pandas()
+def resultset_to_numpy(result_set: SessionDataSet):
+ """
+ Transforms a SessionDataSet from IoTDB to a Numpy array
+ Each Field from IoTDB is a column
+ :param result_set:
+ :return:
+ """
+ return result_set.iotdb_rpc_data_set.resultset_to_numpy()
+
+
def get_typed_point(field: Field, none_value=None):
choices = {
# In Case of Boolean, cast to 0 / 1
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index c7cce58ea5..0c59d6127f 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -34,7 +34,7 @@ def test_simple_query():
# Read
session_data_set = session.execute_query_statement("SELECT ** FROM root")
- df = session_data_set.todf()
+ df = session_data_set.to_df()
session.close()
@@ -54,7 +54,7 @@ def test_non_time_query():
# Read
session_data_set = session.execute_query_statement("SHOW TIMESERIES")
- df = session_data_set.todf()
+ df = session_data_set.to_df()
session.close()
diff --git a/client-py/tests/test_tablet.py b/client-py/tests/test_tablet.py
index 1e80277d77..a3fb8dac6d 100644
--- a/client-py/tests/test_tablet.py
+++ b/client-py/tests/test_tablet.py
@@ -61,7 +61,7 @@ def test_tablet_insertion():
session_data_set = session.execute_query_statement(
"select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
)
- df_output = session_data_set.todf()
+ df_output = session_data_set.to_df()
df_output = df_output[df_input.columns.tolist()]
session.close()
@@ -104,7 +104,7 @@ def test_nullable_tablet_insertion():
session_data_set = session.execute_query_statement(
"select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
)
- df_output = session_data_set.todf()
+ df_output = session_data_set.to_df()
df_output = df_output[df_input.columns.tolist()]
session.close()
diff --git a/client-py/tests/test_todf.py b/client-py/tests/test_todf.py
index 07953446cf..03fe2e76a5 100644
--- a/client-py/tests/test_todf.py
+++ b/client-py/tests/test_todf.py
@@ -94,7 +94,7 @@ def test_simple_query():
df_input.insert(0, "Time", timestamps)
session_data_set = session.execute_query_statement("SELECT ** FROM root")
- df_output = session_data_set.todf()
+ df_output = session_data_set.to_df()
df_output = df_output[df_input.columns.tolist()]
session.close()
@@ -174,7 +174,7 @@ def test_with_null_query():
df_input.insert(0, "Time", timestamps)
session_data_set = session.execute_query_statement("SELECT ** FROM root")
- df_output = session_data_set.todf()
+ df_output = session_data_set.to_df()
df_output = df_output[df_input.columns.tolist()]
session.close()
@@ -212,7 +212,7 @@ def test_multi_fetch():
session_data_set = session.execute_query_statement("SELECT ** FROM root")
session_data_set.set_fetch_size(100)
- df_output = session_data_set.todf()
+ df_output = session_data_set.to_df()
df_output = df_output[df_input.columns.tolist()]
session.close()