You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 12:48:54 UTC

[iotdb] branch ml/windowSet updated (511a6686f9 -> a12c7fb3fa)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 511a6686f9 add python session (tmp save)
     new 9fd4877885 finish python session
     new db3eab0588 support shuffle
     new a12c7fb3fa support function

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 client-py/README.md                                |   2 +-
 client-py/SessionExample.py                        | 762 +++++++++++----------
 client-py/iotdb/Session.py                         | 224 +++---
 client-py/iotdb/dbapi/Cursor.py                    |   2 +-
 client-py/iotdb/utils/IoTDBRpcDataSet.py           |  13 +-
 client-py/iotdb/utils/SessionDataSet.py            |  66 +-
 client-py/tests/test_dataframe.py                  |   4 +-
 client-py/tests/test_tablet.py                     |   4 +-
 client-py/tests/test_todf.py                       |   6 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  33 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  12 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   8 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  38 +-
 13 files changed, 650 insertions(+), 524 deletions(-)


[iotdb] 03/03: support function

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a12c7fb3fa00eb230fd91fa7913cd00c0811beea
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 14 20:48:30 2022 +0800

    support function
---
 client-py/SessionExample.py                        |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 33 ++++++++++++++--------
 2 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index e50521ad18..fa65a8d614 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -48,7 +48,7 @@ fetch_args = {
     "indexes": [9, 0, 5, 3]
 }
 
-print(session.fetch_window_batch(ts_path_list, None, fetch_args))
+print(session.fetch_window_batch(ts_path_list, "sin", fetch_args))
 
 # # set and delete storage groups
 # session.set_storage_group("root.sg_test_01")
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 76beb5b7ca..a1811979b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1230,7 +1230,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     // set source
     List<MeasurementPath> measurementPaths = schemaTree.getAllMeasurement();
     Set<Expression> sourceExpressions =
-        measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet());
+        measurementPaths.stream()
+            .map(TimeSeriesOperand::new)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
     for (Expression sourceExpression : sourceExpressions) {
       analyzeExpression(analysis, sourceExpression);
     }
@@ -1247,21 +1249,30 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
                           functionName,
                           new LinkedHashMap<>(),
                           Collections.singletonList(expression)))
-              .collect(Collectors.toSet());
+              .collect(Collectors.toCollection(LinkedHashSet::new));
       for (Expression sourceTransformExpression : sourceTransformExpressions) {
         analyzeExpression(analysis, sourceTransformExpression);
       }
       analysis.setSourceTransformExpressions(sourceTransformExpressions);
-    }
 
-    // set output
-    List<ColumnHeader> columnHeaders =
-        measurementPaths.stream()
-            .map(
-                measurementPath ->
-                    new ColumnHeader(measurementPath.toString(), measurementPath.getSeriesType()))
-            .collect(Collectors.toList());
-    analysis.setRespDatasetHeader(new DatasetHeader(columnHeaders, false));
+      // set output
+      List<ColumnHeader> columnHeaders =
+          sourceTransformExpressions.stream()
+              .map(
+                  expression ->
+                      new ColumnHeader(expression.toString(), analysis.getType(expression)))
+              .collect(Collectors.toList());
+      analysis.setRespDatasetHeader(new DatasetHeader(columnHeaders, false));
+    } else {
+      // set output
+      List<ColumnHeader> columnHeaders =
+          measurementPaths.stream()
+              .map(
+                  measurementPath ->
+                      new ColumnHeader(measurementPath.toString(), measurementPath.getSeriesType()))
+              .collect(Collectors.toList());
+      analysis.setRespDatasetHeader(new DatasetHeader(columnHeaders, false));
+    }
 
     Set<String> deviceSet =
         measurementPaths.stream().map(PartialPath::getDevice).collect(Collectors.toSet());


[iotdb] 01/03: finish python session

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9fd48778854f9c41077a3d1047e592a2d8469f91
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 14 20:04:29 2022 +0800

    finish python session
---
 client-py/README.md                      |   2 +-
 client-py/SessionExample.py              | 762 ++++++++++++++++---------------
 client-py/iotdb/Session.py               | 224 +++++----
 client-py/iotdb/dbapi/Cursor.py          |   2 +-
 client-py/iotdb/utils/IoTDBRpcDataSet.py |  13 +-
 client-py/iotdb/utils/SessionDataSet.py  |  66 ++-
 client-py/tests/test_dataframe.py        |   4 +-
 client-py/tests/test_tablet.py           |   4 +-
 client-py/tests/test_todf.py             |   6 +-
 9 files changed, 586 insertions(+), 497 deletions(-)

diff --git a/client-py/README.md b/client-py/README.md
index 3197cc7186..c124b32881 100644
--- a/client-py/README.md
+++ b/client-py/README.md
@@ -386,7 +386,7 @@ session.open(False)
 result = session.execute_query_statement("SELECT * FROM root.*")
 
 # Transform to Pandas Dataset
-df = result.todf()
+df = result.to_df()
 
 session.close()
 
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 61e82234db..4f04855def 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -35,382 +35,398 @@ password_ = "root"
 session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
 session.open(False)
 
-# set and delete storage groups
-session.set_storage_group("root.sg_test_01")
-session.set_storage_group("root.sg_test_02")
-session.set_storage_group("root.sg_test_03")
-session.set_storage_group("root.sg_test_04")
-session.delete_storage_group("root.sg_test_02")
-session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
-
-# setting time series.
-session.create_time_series(
-    "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
-)
-session.create_time_series(
-    "root.sg_test_01.d_02.s_01",
-    TSDataType.BOOLEAN,
-    TSEncoding.PLAIN,
-    Compressor.SNAPPY,
-    None,
-    {"tag1": "v1"},
-    {"description": "v1"},
-    "temperature",
-)
-
-# setting multiple time series once.
-ts_path_lst_ = [
-    "root.sg_test_01.d_01.s_04",
-    "root.sg_test_01.d_01.s_05",
-    "root.sg_test_01.d_01.s_06",
-    "root.sg_test_01.d_01.s_07",
-    "root.sg_test_01.d_01.s_08",
-    "root.sg_test_01.d_01.s_09",
-]
-data_type_lst_ = [
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
-    ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
-)
-
-ts_path_lst_ = [
-    "root.sg_test_01.d_02.s_04",
-    "root.sg_test_01.d_02.s_05",
-    "root.sg_test_01.d_02.s_06",
-    "root.sg_test_01.d_02.s_07",
-    "root.sg_test_01.d_02.s_08",
-    "root.sg_test_01.d_02.s_09",
-]
-data_type_lst_ = [
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
-]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
-attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(
-    ts_path_lst_,
-    data_type_lst_,
-    encoding_lst_,
-    compressor_lst_,
-    None,
-    tags_lst_,
-    attributes_lst_,
-    None,
-)
-
-# delete time series
-session.delete_time_series(
-    [
-        "root.sg_test_01.d_01.s_07",
-        "root.sg_test_01.d_01.s_08",
-        "root.sg_test_01.d_01.s_09",
-    ]
-)
-
-# checking time series
-print(
-    "s_07 expecting False, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
-)
-print(
-    "s_03 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
-)
-print(
-    "d_02.s_01 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
-)
-print(
-    "d_02.s_06 expecting True, checking result: ",
-    session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
-)
-
-# insert one record into the database.
-measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
-values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
-data_types_ = [
-    TSDataType.BOOLEAN,
-    TSDataType.INT32,
-    TSDataType.INT64,
-    TSDataType.FLOAT,
-    TSDataType.DOUBLE,
-    TSDataType.TEXT,
+ts_path_list = [
+    "root.sg1.d1.s1",
+    "root.sg1.d1.s2"
 ]
-session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
-
-# insert multiple records into database
-measurements_list_ = [
-    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
-    ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
-]
-values_list_ = [
-    [False, 22, 33, 4.4, 55.1, "test_records01"],
-    [True, 77, 88, 1.25, 8.125, "test_records02"],
-]
-data_type_list_ = [data_types_, data_types_]
-device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
-session.insert_records(
-    device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
-)
-
-# insert one tablet into the database.
-values_ = [
-    [False, 10, 11, 1.1, 10011.1, "test01"],
-    [True, 100, 11111, 1.25, 101.0, "test02"],
-    [False, 100, 1, 188.1, 688.25, "test03"],
-    [True, 0, 0, 0, 6.25, "test04"],
-]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [4, 5, 6, 7]
-tablet_ = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert one numpy tablet into the database.
-np_values_ = [
-    np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
-    np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
-    np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
-    np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
-    np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
-    np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
-]
-np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
-np_tablet_ = NumpyTablet(
-    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
-)
-session.insert_tablet(np_tablet_)
-
-# insert one unsorted numpy tablet into the database.
-np_values_unsorted = [
-    np.array([False, False, False, True, True], np.dtype(">?")),
-    np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
-    np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
-    np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
-    np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
-    np.array(["test09", "test08", "test07", "test06", "test05"]),
-]
-np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
-np_tablet_unsorted = NumpyTablet(
-    "root.sg_test_01.d_02",
-    measurements_,
-    data_types_,
-    np_values_unsorted,
-    np_timestamps_unsorted,
-)
-session.insert_tablet(np_tablet_unsorted)
-print(np_tablet_unsorted.get_timestamps())
-for value in np_tablet_unsorted.get_values():
-    print(value)
-
-# insert multiple tablets into database
-tablet_01 = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
-)
-tablet_02 = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
-)
-session.insert_tablets([tablet_01, tablet_02])
-
-# insert one tablet with empty cells into the database.
-values_ = [
-    [None, 10, 11, 1.1, 10011.1, "test01"],
-    [True, None, 11111, 1.25, 101.0, "test02"],
-    [False, 100, 1, None, 688.25, "test03"],
-    [True, 0, 0, 0, 6.25, None],
-]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
-timestamps_ = [16, 17, 18, 19]
-tablet_ = Tablet(
-    "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
-)
-session.insert_tablet(tablet_)
-
-# insert records of one device
-time_list = [1, 2, 3]
-measurements_list = [
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-]
-data_types_list = [
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-    [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
-]
-values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
-
-session.insert_records_of_one_device(
-    "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
-)
-
-# execute non-query sql statement
-session.execute_non_query_statement(
-    "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# execute sql query statement
-with session.execute_query_statement(
-    "select * from root.sg_test_01.d_01"
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-# execute sql query statement
-with session.execute_query_statement(
-    "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-# execute statement
-with session.execute_statement(
-    "select * from root.sg_test_01.d_01"
-) as session_data_set:
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-session.execute_statement(
-    "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
-)
-
-# insert string records of one device
-time_list = [1, 2, 3]
-measurements_list = [
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-    ["s_01", "s_02", "s_03"],
-]
-values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
-
-session.insert_string_records_of_one_device(
-    "root.sg_test_01.d_03",
-    time_list,
-    measurements_list,
-    values_list,
-)
-
-with session.execute_raw_data_query(
-    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-with session.execute_last_data_query(
-    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
-) as session_data_set:
-    session_data_set.set_fetch_size(1024)
-    while session_data_set.has_next():
-        print(session_data_set.next())
-
-# delete storage group
-session.delete_storage_group("root.sg_test_01")
-
-# create measurement node template
-template = Template(name="template_python", share_time=False)
-m_node_1 = MeasurementNode(
-    name="s1",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-m_node_2 = MeasurementNode(
-    name="s2",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-m_node_3 = MeasurementNode(
-    name="s3",
-    data_type=TSDataType.INT64,
-    encoding=TSEncoding.RLE,
-    compression_type=Compressor.SNAPPY,
-)
-template.add_template(m_node_1)
-template.add_template(m_node_2)
-template.add_template(m_node_3)
-session.create_schema_template(template)
-print("create template success template_python")
-
-# create internal node template
-template_name = "treeTemplate_python"
-template = Template(name=template_name, share_time=True)
-i_node_gps = InternalNode(name="GPS", share_time=False)
-i_node_v = InternalNode(name="vehicle", share_time=True)
-m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
-
-i_node_gps.add_child(m_node_x)
-i_node_v.add_child(m_node_x)
-template.add_template(i_node_gps)
-template.add_template(i_node_v)
-template.add_template(m_node_x)
 
-session.create_schema_template(template)
-print("create template success treeTemplate_python}")
-
-print(session.is_measurement_in_template(template_name, "GPS"))
-print(session.is_measurement_in_template(template_name, "GPS.x"))
-print(session.show_all_templates())
-
-# # append schema template
-data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
-encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
-compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
-
-measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
-session.add_measurements_in_template(
-    template_name,
-    measurements_aligned_path,
-    data_types,
-    encoding_list,
-    compressor_list,
-    is_aligned=True,
-)
-# session.drop_schema_template("add_template_python")
-measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
-session.add_measurements_in_template(
-    template_name,
-    measurements_aligned_path,
-    data_types,
-    encoding_list,
-    compressor_list,
-    is_aligned=False,
-)
-session.delete_node_in_template(template_name, "aligned.s1")
-print(session.count_measurements_in_template(template_name))
-print(session.is_path_exist_in_template(template_name, "aligned.s1"))
-print(session.is_path_exist_in_template(template_name, "aligned.s2"))
-
-session.set_schema_template(template_name, "root.python.set")
-print(session.show_paths_template_using_on(template_name))
-print(session.show_paths_template_set_on(template_name))
-session.unset_schema_template(template_name, "root.python.set")
+fetch_args = {
+    "start_time": 0,
+    "end_time": 32,
+    "interval": 4,
+    "sliding_step": 1,
+    "indexes": [0, 3, 5, 9]
+}
+
+print(session.fetch_window_batch(ts_path_list, None, fetch_args))
+
+# # set and delete storage groups
+# session.set_storage_group("root.sg_test_01")
+# session.set_storage_group("root.sg_test_02")
+# session.set_storage_group("root.sg_test_03")
+# session.set_storage_group("root.sg_test_04")
+# session.delete_storage_group("root.sg_test_02")
+# session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+#
+# # setting time series.
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
+# )
+# session.create_time_series(
+#     "root.sg_test_01.d_02.s_01",
+#     TSDataType.BOOLEAN,
+#     TSEncoding.PLAIN,
+#     Compressor.SNAPPY,
+#     None,
+#     {"tag1": "v1"},
+#     {"description": "v1"},
+#     "temperature",
+# )
+#
+# # setting multiple time series once.
+# ts_path_lst_ = [
+#     "root.sg_test_01.d_01.s_04",
+#     "root.sg_test_01.d_01.s_05",
+#     "root.sg_test_01.d_01.s_06",
+#     "root.sg_test_01.d_01.s_07",
+#     "root.sg_test_01.d_01.s_08",
+#     "root.sg_test_01.d_01.s_09",
+# ]
+# data_type_lst_ = [
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+#     ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+# )
+#
+# ts_path_lst_ = [
+#     "root.sg_test_01.d_02.s_04",
+#     "root.sg_test_01.d_02.s_05",
+#     "root.sg_test_01.d_02.s_06",
+#     "root.sg_test_01.d_02.s_07",
+#     "root.sg_test_01.d_02.s_08",
+#     "root.sg_test_01.d_02.s_09",
+# ]
+# data_type_lst_ = [
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+# compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+# tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
+# attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
+# session.create_multi_time_series(
+#     ts_path_lst_,
+#     data_type_lst_,
+#     encoding_lst_,
+#     compressor_lst_,
+#     None,
+#     tags_lst_,
+#     attributes_lst_,
+#     None,
+# )
+#
+# # delete time series
+# session.delete_time_series(
+#     [
+#         "root.sg_test_01.d_01.s_07",
+#         "root.sg_test_01.d_01.s_08",
+#         "root.sg_test_01.d_01.s_09",
+#     ]
+# )
+#
+# # checking time series
+# print(
+#     "s_07 expecting False, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
+# )
+# print(
+#     "s_03 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
+# )
+# print(
+#     "d_02.s_01 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
+# )
+# print(
+#     "d_02.s_06 expecting True, checking result: ",
+#     session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
+# )
+#
+# # insert one record into the database.
+# measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+# values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+# data_types_ = [
+#     TSDataType.BOOLEAN,
+#     TSDataType.INT32,
+#     TSDataType.INT64,
+#     TSDataType.FLOAT,
+#     TSDataType.DOUBLE,
+#     TSDataType.TEXT,
+# ]
+# session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
+#
+# # insert multiple records into database
+# measurements_list_ = [
+#     ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+#     ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+# ]
+# values_list_ = [
+#     [False, 22, 33, 4.4, 55.1, "test_records01"],
+#     [True, 77, 88, 1.25, 8.125, "test_records02"],
+# ]
+# data_type_list_ = [data_types_, data_types_]
+# device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+# session.insert_records(
+#     device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+# )
+#
+# # insert one tablet into the database.
+# values_ = [
+#     [False, 10, 11, 1.1, 10011.1, "test01"],
+#     [True, 100, 11111, 1.25, 101.0, "test02"],
+#     [False, 100, 1, 188.1, 688.25, "test03"],
+#     [True, 0, 0, 0, 6.25, "test04"],
+# ]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [4, 5, 6, 7]
+# tablet_ = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert one numpy tablet into the database.
+# np_values_ = [
+#     np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
+#     np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
+#     np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
+#     np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
+#     np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
+#     np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
+# ]
+# np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
+# np_tablet_ = NumpyTablet(
+#     "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+# )
+# session.insert_tablet(np_tablet_)
+#
+# # insert one unsorted numpy tablet into the database.
+# np_values_unsorted = [
+#     np.array([False, False, False, True, True], np.dtype(">?")),
+#     np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")),
+#     np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")),
+#     np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")),
+#     np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")),
+#     np.array(["test09", "test08", "test07", "test06", "test05"]),
+# ]
+# np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8"))
+# np_tablet_unsorted = NumpyTablet(
+#     "root.sg_test_01.d_02",
+#     measurements_,
+#     data_types_,
+#     np_values_unsorted,
+#     np_timestamps_unsorted,
+# )
+# session.insert_tablet(np_tablet_unsorted)
+# print(np_tablet_unsorted.get_timestamps())
+# for value in np_tablet_unsorted.get_values():
+#     print(value)
+#
+# # insert multiple tablets into database
+# tablet_01 = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
+# )
+# tablet_02 = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
+# )
+# session.insert_tablets([tablet_01, tablet_02])
+#
+# # insert one tablet with empty cells into the database.
+# values_ = [
+#     [None, 10, 11, 1.1, 10011.1, "test01"],
+#     [True, None, 11111, 1.25, 101.0, "test02"],
+#     [False, 100, 1, None, 688.25, "test03"],
+#     [True, 0, 0, 0, 6.25, None],
+# ]  # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+# timestamps_ = [16, 17, 18, 19]
+# tablet_ = Tablet(
+#     "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+# )
+# session.insert_tablet(tablet_)
+#
+# # insert records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+# ]
+# data_types_list = [
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+#     [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+# ]
+# values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+#
+# session.insert_records_of_one_device(
+#     "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
+# )
+#
+# # execute non-query sql statement
+# session.execute_non_query_statement(
+#     "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # execute sql query statement
+# with session.execute_query_statement(
+#     "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+# # execute sql query statement
+# with session.execute_query_statement(
+#     "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# # execute statement
+# with session.execute_statement(
+#     "select * from root.sg_test_01.d_01"
+# ) as session_data_set:
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# session.execute_statement(
+#     "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+# )
+#
+# # insert string records of one device
+# time_list = [1, 2, 3]
+# measurements_list = [
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+#     ["s_01", "s_02", "s_03"],
+# ]
+# values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", "26"]]
+#
+# session.insert_string_records_of_one_device(
+#     "root.sg_test_01.d_03",
+#     time_list,
+#     measurements_list,
+#     values_list,
+# )
+#
+# with session.execute_raw_data_query(
+#     ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# with session.execute_last_data_query(
+#     ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
+# ) as session_data_set:
+#     session_data_set.set_fetch_size(1024)
+#     while session_data_set.has_next():
+#         print(session_data_set.next())
+#
+# # delete storage group
+# session.delete_storage_group("root.sg_test_01")
+#
+# # create measurement node template
+# template = Template(name="template_python", share_time=False)
+# m_node_1 = MeasurementNode(
+#     name="s1",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# m_node_2 = MeasurementNode(
+#     name="s2",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# m_node_3 = MeasurementNode(
+#     name="s3",
+#     data_type=TSDataType.INT64,
+#     encoding=TSEncoding.RLE,
+#     compression_type=Compressor.SNAPPY,
+# )
+# template.add_template(m_node_1)
+# template.add_template(m_node_2)
+# template.add_template(m_node_3)
+# session.create_schema_template(template)
+# print("create template success template_python")
+#
+# # create internal node template
+# template_name = "treeTemplate_python"
+# template = Template(name=template_name, share_time=True)
+# i_node_gps = InternalNode(name="GPS", share_time=False)
+# i_node_v = InternalNode(name="vehicle", share_time=True)
+# m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
+#
+# i_node_gps.add_child(m_node_x)
+# i_node_v.add_child(m_node_x)
+# template.add_template(i_node_gps)
+# template.add_template(i_node_v)
+# template.add_template(m_node_x)
+#
+# session.create_schema_template(template)
+# print("create template success treeTemplate_python}")
+#
+# print(session.is_measurement_in_template(template_name, "GPS"))
+# print(session.is_measurement_in_template(template_name, "GPS.x"))
+# print(session.show_all_templates())
+#
+# # # append schema template
+# data_types = [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.DOUBLE]
+# encoding_list = [TSEncoding.RLE, TSEncoding.RLE, TSEncoding.GORILLA]
+# compressor_list = [Compressor.SNAPPY, Compressor.SNAPPY, Compressor.LZ4]
+#
+# measurements_aligned_path = ["aligned.s1", "aligned.s2", "aligned.s3"]
+# session.add_measurements_in_template(
+#     template_name,
+#     measurements_aligned_path,
+#     data_types,
+#     encoding_list,
+#     compressor_list,
+#     is_aligned=True,
+# )
+# # session.drop_schema_template("add_template_python")
+# measurements_aligned_path = ["unaligned.s1", "unaligned.s2", "unaligned.s3"]
+# session.add_measurements_in_template(
+#     template_name,
+#     measurements_aligned_path,
+#     data_types,
+#     encoding_list,
+#     compressor_list,
+#     is_aligned=False,
+# )
+# session.delete_node_in_template(template_name, "aligned.s1")
+# print(session.count_measurements_in_template(template_name))
+# print(session.is_path_exist_in_template(template_name, "aligned.s1"))
+# print(session.is_path_exist_in_template(template_name, "aligned.s2"))
+#
+# session.set_schema_template(template_name, "root.python.set")
+# print(session.show_paths_template_using_on(template_name))
+# print(session.show_paths_template_set_on(template_name))
+# session.unset_schema_template(template_name, "root.python.set")
+#
+# # drop template
+# session.drop_schema_template("template_python")
+# session.drop_schema_template(template_name)
+# print("drop template success, template_python and treeTemplate_python")
 
-# drop template
-session.drop_schema_template("template_python")
-session.drop_schema_template(template_name)
-print("drop template success, template_python and treeTemplate_python")
 # close session connection.
 session.close()
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 471dc98b62..64be51e9ac 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -18,6 +18,8 @@
 import logging
 import struct
 import time
+
+import numpy as np
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.transport import TSocket, TTransport
 
@@ -53,6 +55,8 @@ from .thrift.rpc.ttypes import (
     TSRawDataQueryReq,
     TSLastDataQueryReq,
     TSInsertStringRecordsOfOneDeviceReq,
+    TGroupByTimeParameter,
+    TSFetchWindowBatchReq,
 )
 # for debug
 # from IoTDBConstants import *
@@ -78,13 +82,13 @@ class Session(object):
     DEFAULT_ZONE_ID = time.strftime("%z")
 
     def __init__(
-        self,
-        host,
-        port,
-        user=DEFAULT_USER,
-        password=DEFAULT_PASSWORD,
-        fetch_size=DEFAULT_FETCH_SIZE,
-        zone_id=DEFAULT_ZONE_ID,
+            self,
+            host,
+            port,
+            user=DEFAULT_USER,
+            password=DEFAULT_PASSWORD,
+            fetch_size=DEFAULT_FETCH_SIZE,
+            zone_id=DEFAULT_ZONE_ID,
     ):
         self.__host = host
         self.__port = port
@@ -206,15 +210,15 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_time_series(
-        self,
-        ts_path,
-        data_type,
-        encoding,
-        compressor,
-        props=None,
-        tags=None,
-        attributes=None,
-        alias=None,
+            self,
+            ts_path,
+            data_type,
+            encoding,
+            compressor,
+            props=None,
+            tags=None,
+            attributes=None,
+            alias=None,
     ):
         """
         create single time series
@@ -249,7 +253,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_aligned_time_series(
-        self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
+            self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
     ):
         """
         create aligned time series
@@ -281,15 +285,15 @@ class Session(object):
         return Session.verify_success(status)
 
     def create_multi_time_series(
-        self,
-        ts_path_lst,
-        data_type_lst,
-        encoding_lst,
-        compressor_lst,
-        props_lst=None,
-        tags_lst=None,
-        attributes_lst=None,
-        alias_lst=None,
+            self,
+            ts_path_lst,
+            data_type_lst,
+            encoding_lst,
+            compressor_lst,
+            props_lst=None,
+            tags_lst=None,
+            attributes_lst=None,
+            alias_lst=None,
     ):
         """
         create multiple time series
@@ -386,7 +390,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_str_record(
-        self, device_id, timestamp, measurements, string_values
+            self, device_id, timestamp, measurements, string_values
     ):
         """special case for inserting one row of String (TEXT) value"""
         if type(string_values) == str:
@@ -432,7 +436,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         insert multiple rows of data, records are independent to each other, in other words, there's no relationship
@@ -460,7 +464,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_record(
-        self, device_id, timestamp, measurements, data_types, values
+            self, device_id, timestamp, measurements, data_types, values
     ):
         """
         insert one row of aligned record into database, if you want improve your performance, please use insertTablet method
@@ -487,7 +491,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship
@@ -515,7 +519,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def test_insert_record(
-        self, device_id, timestamp, measurements, data_types, values
+            self, device_id, timestamp, measurements, data_types, values
     ):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -540,7 +544,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def test_insert_records(
-        self, device_ids, times, measurements_lst, types_lst, values_lst
+            self, device_ids, times, measurements_lst, types_lst, values_lst
     ):
         """
         this method NOT insert data into database and the server just return after accept the request, this method
@@ -566,7 +570,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+            self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -583,7 +587,7 @@ class Session(object):
         )
 
     def gen_insert_str_record_req(
-        self, device_id, timestamp, measurements, data_types, values, is_aligned=False
+            self, device_id, timestamp, measurements, data_types, values, is_aligned=False
     ):
         if (len(values) != len(data_types)) or (len(values) != len(measurements)):
             raise RuntimeError(
@@ -594,19 +598,19 @@ class Session(object):
         )
 
     def gen_insert_records_req(
-        self,
-        device_ids,
-        times,
-        measurements_lst,
-        types_lst,
-        values_lst,
-        is_aligned=False,
+            self,
+            device_ids,
+            times,
+            measurements_lst,
+            types_lst,
+            values_lst,
+            is_aligned=False,
     ):
         if (
-            (len(device_ids) != len(measurements_lst))
-            or (len(times) != len(types_lst))
-            or (len(device_ids) != len(times))
-            or (len(times) != len(values_lst))
+                (len(device_ids) != len(measurements_lst))
+                or (len(times) != len(types_lst))
+                or (len(device_ids) != len(times))
+                or (len(times) != len(values_lst))
         ):
             raise RuntimeError(
                 "deviceIds, times, measurementsList and valuesList's size should be equal"
@@ -614,7 +618,7 @@ class Session(object):
 
         value_lst = []
         for values, data_types, measurements in zip(
-            values_lst, types_lst, measurements_lst
+                values_lst, types_lst, measurements_lst
         ):
             if (len(values) != len(data_types)) or (len(values) != len(measurements)):
                 raise RuntimeError(
@@ -697,7 +701,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_records_of_one_device(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         # sort by timestamp
         sorted_zipped = sorted(
@@ -713,7 +717,7 @@ class Session(object):
         )
 
     def insert_records_of_one_device_sorted(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         """
         Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
@@ -730,9 +734,9 @@ class Session(object):
         # check parameter
         size = len(times_list)
         if (
-            size != len(measurements_list)
-            or size != len(types_list)
-            or size != len(values_list)
+                size != len(measurements_list)
+                or size != len(types_list)
+                or size != len(values_list)
         ):
             raise RuntimeError(
                 "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -755,7 +759,7 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_records_of_one_device(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         # sort by timestamp
         sorted_zipped = sorted(
@@ -771,7 +775,7 @@ class Session(object):
         )
 
     def insert_aligned_records_of_one_device_sorted(
-        self, device_id, times_list, measurements_list, types_list, values_list
+            self, device_id, times_list, measurements_list, types_list, values_list
     ):
         """
         Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc
@@ -787,9 +791,9 @@ class Session(object):
         # check parameter
         size = len(times_list)
         if (
-            size != len(measurements_list)
-            or size != len(types_list)
-            or size != len(values_list)
+                size != len(measurements_list)
+                or size != len(types_list)
+                or size != len(values_list)
         ):
             raise RuntimeError(
                 "insert records of one device error: types, times, measurementsList and valuesList's size should be equal"
@@ -812,17 +816,17 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_records_of_one_device_request(
-        self,
-        device_id,
-        times_list,
-        measurements_list,
-        values_list,
-        types_list,
-        is_aligned=False,
+            self,
+            device_id,
+            times_list,
+            measurements_list,
+            values_list,
+            types_list,
+            is_aligned=False,
     ):
         binary_value_list = []
         for values, data_types, measurements in zip(
-            values_list, types_list, measurements_list
+                values_list, types_list, measurements_list
         ):
             data_types = [data_type.value for data_type in data_types]
             if (len(values) != len(data_types)) or (len(values) != len(measurements)):
@@ -1037,7 +1041,7 @@ class Session(object):
         return -1
 
     def execute_raw_data_query(
-        self, paths: list, start_time: int, end_time: int
+            self, paths: list, start_time: int, end_time: int
     ) -> SessionDataSet:
         """
         execute query statement and returns SessionDataSet
@@ -1100,12 +1104,12 @@ class Session(object):
         )
 
     def insert_string_records_of_one_device(
-        self,
-        device_id: str,
-        times: list,
-        measurements_list: list,
-        values_list: list,
-        have_sorted: bool = False,
+            self,
+            device_id: str,
+            times: list,
+            measurements_list: list,
+            values_list: list,
+            have_sorted: bool = False,
     ):
         """
         insert multiple row of string record into database:
@@ -1132,12 +1136,12 @@ class Session(object):
         return Session.verify_success(status)
 
     def insert_aligned_string_records_of_one_device(
-        self,
-        device_id: str,
-        times: list,
-        measurements_list: list,
-        values: list,
-        have_sorted: bool = False,
+            self,
+            device_id: str,
+            times: list,
+            measurements_list: list,
+            values: list,
+            have_sorted: bool = False,
     ):
         if (len(times) != len(measurements_list)) or (len(times) != len(values)):
             raise RuntimeError(
@@ -1154,13 +1158,13 @@ class Session(object):
         return Session.verify_success(status)
 
     def gen_insert_string_records_of_one_device_request(
-        self,
-        device_id,
-        times,
-        measurements_list,
-        values_list,
-        have_sorted,
-        is_aligned=False,
+            self,
+            device_id,
+            times,
+            measurements_list,
+            values_list,
+            have_sorted,
+            is_aligned=False,
     ):
         if (len(times) != len(measurements_list)) or (len(times) != len(values_list)):
             raise RuntimeError(
@@ -1244,13 +1248,13 @@ class Session(object):
             raise RuntimeError("execution of statement fails because: ", e)
 
     def add_measurements_in_template(
-        self,
-        template_name: str,
-        measurements_path: list,
-        data_types: list,
-        encodings: list,
-        compressors: list,
-        is_aligned: bool = False,
+            self,
+            template_name: str,
+            measurements_path: list,
+            data_types: list,
+            encodings: list,
+            compressors: list,
+            is_aligned: bool = False,
     ):
         """
         add measurements in the template, the template must already create. This function adds some measurements node.
@@ -1452,10 +1456,38 @@ class Session(object):
         )
         return response.measurements
 
-    def fetch_window_batch(self, query_paths : list, function_name : str, fetch_args):
+    def fetch_window_batch(self, query_paths: list, function_name: str, fetch_args):
         request = TSFetchWindowBatchReq(
-            self.__session_id,query_paths,function_name
-            fetch_args.
+            self.__session_id,
+            self.__statement_id,
+            query_paths,
+            function_name,
+            TGroupByTimeParameter(fetch_args["start_time"], fetch_args["end_time"], fetch_args["interval"],
+                                  fetch_args["sliding_step"],
+                                  fetch_args["indexes"])
         )
-        response = self.__client.fetch_window_batch(request)
-        return
\ No newline at end of file
+        try:
+            resp = self.__client.fetchWindowBatch(request)
+            status = resp.status
+
+            if Session.verify_success(status) == 0:
+                window_batch = []
+                for window_result_set in resp.windowBatchDataSetList:
+                    window_session_data_set = SessionDataSet.init_from_window(
+                        resp.columnNameList,
+                        resp.columnTypeList,
+                        resp.columnNameIndexMap,
+                        self.__statement_id,
+                        self.__session_id,
+                        window_result_set
+                    )
+
+                    window_df = window_session_data_set.to_df(window_session_data_set)
+                    window_batch.append(window_df)
+                return np.array(window_batch)
+            else:
+                raise RuntimeError(
+                    "execution of fetch window batch fails because: {}", status.message
+                )
+        except TTransport.TException as e:
+            raise RuntimeError("execution of fetch window batch fails because: ", e)
diff --git a/client-py/iotdb/dbapi/Cursor.py b/client-py/iotdb/dbapi/Cursor.py
index a1d6e2caab..a6a4e24e7d 100644
--- a/client-py/iotdb/dbapi/Cursor.py
+++ b/client-py/iotdb/dbapi/Cursor.py
@@ -136,7 +136,7 @@ class Cursor(object):
             rows = []
 
             if data_set:
-                data = data_set.todf()
+                data = data_set.to_df()
 
                 if self.__sqlalchemy_mode and time_index:
                     time_column = data.columns[0]
diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py
index de0fe7728c..99417b0b80 100644
--- a/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -48,6 +48,7 @@ class IoTDBRpcDataSet(object):
         session_id,
         query_data_set,
         fetch_size,
+        is_rpc_fetch_result
     ):
         self.__statement_id = statement_id
         self.__session_id = session_id
@@ -58,6 +59,7 @@ class IoTDBRpcDataSet(object):
         self.__fetch_size = fetch_size
         self.__column_size = len(column_name_list)
         self.__default_time_out = 1000
+        self.__is_rpc_fetch_result = is_rpc_fetch_result
 
         self.__column_name_list = []
         self.__column_type_list = []
@@ -137,7 +139,7 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__is_rpc_fetch_result and self.fetch_results():
             self.construct_one_row()
             return True
         return False
@@ -152,14 +154,14 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__is_rpc_fetch_result and self.fetch_results():
             return True
         return False
 
     def _to_bitstring(self, b):
         return "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b))
 
-    def resultset_to_pandas(self):
+    def resultset_to_numpy(self):
         result = {}
         for column_name in self.__column_name_list:
             result[column_name] = None
@@ -278,9 +280,10 @@ class IoTDBRpcDataSet(object):
         for k, v in result.items():
             if v is None:
                 result[k] = []
+        return result
 
-        df = pd.DataFrame(result)
-        return df
+    def resultset_to_pandas(self):
+        return pd.DataFrame(self.resultset_to_numpy())
 
     def construct_one_row(self):
         # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
diff --git a/client-py/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py
index 02eef027df..b9c08de453 100644
--- a/client-py/iotdb/utils/SessionDataSet.py
+++ b/client-py/iotdb/utils/SessionDataSet.py
@@ -32,17 +32,17 @@ logger = logging.getLogger("IoTDB")
 
 class SessionDataSet(object):
     def __init__(
-        self,
-        sql,
-        column_name_list,
-        column_type_list,
-        column_name_index,
-        query_id,
-        client,
-        statement_id,
-        session_id,
-        query_data_set,
-        ignore_timestamp,
+            self,
+            sql,
+            column_name_list,
+            column_type_list,
+            column_name_index,
+            query_id,
+            client,
+            statement_id,
+            session_id,
+            query_data_set,
+            ignore_timestamp,
     ):
         self.iotdb_rpc_data_set = IoTDBRpcDataSet(
             sql,
@@ -56,7 +56,32 @@ class SessionDataSet(object):
             session_id,
             query_data_set,
             1024,
+            True
+        )
+
+    @classmethod
+    def init_from_window(self, column_name_list,
+                         column_type_list,
+                         column_name_index,
+                         statement_id,
+                         session_id,
+                         query_data_set
+                         ):
+        self.iotdb_rpc_data_set = IoTDBRpcDataSet(
+            "",
+            column_name_list,
+            column_type_list,
+            column_name_index,
+            False,
+            -1,
+            None,
+            statement_id,
+            session_id,
+            query_data_set,
+            1024,
+            False
         )
+        return self
 
     def __enter__(self):
         return self
@@ -96,8 +121,8 @@ class SessionDataSet(object):
                 data_set_column_index -= 1
             column_name = self.iotdb_rpc_data_set.get_column_names()[index]
             location = (
-                self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
-                - IoTDBRpcDataSet.START_INDEX
+                    self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
+                    - IoTDBRpcDataSet.START_INDEX
             )
 
             if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
@@ -136,9 +161,12 @@ class SessionDataSet(object):
     def close_operation_handle(self):
         self.iotdb_rpc_data_set.close()
 
-    def todf(self):
+    def to_df(self):
         return resultset_to_pandas(self)
 
+    def to_numpy(self):
+        return resultset_to_numpy(self)
+
 
 def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
     """
@@ -150,6 +178,16 @@ def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
     return result_set.iotdb_rpc_data_set.resultset_to_pandas()
 
 
+def resultset_to_numpy(result_set: SessionDataSet):
+    """
+    Transforms a SessionDataSet from IoTDB to a Numpy array
+    Each Field from IoTDB is a column
+    :param result_set:
+    :return:
+    """
+    return result_set.iotdb_rpc_data_set.resultset_to_numpy()
+
+
 def get_typed_point(field: Field, none_value=None):
     choices = {
         # In Case of Boolean, cast to 0 / 1
diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index c7cce58ea5..0c59d6127f 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -34,7 +34,7 @@ def test_simple_query():
 
         # Read
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df = session_data_set.todf()
+        df = session_data_set.to_df()
 
         session.close()
 
@@ -54,7 +54,7 @@ def test_non_time_query():
 
         # Read
         session_data_set = session.execute_query_statement("SHOW TIMESERIES")
-        df = session_data_set.todf()
+        df = session_data_set.to_df()
 
         session.close()
 
diff --git a/client-py/tests/test_tablet.py b/client-py/tests/test_tablet.py
index 1e80277d77..a3fb8dac6d 100644
--- a/client-py/tests/test_tablet.py
+++ b/client-py/tests/test_tablet.py
@@ -61,7 +61,7 @@ def test_tablet_insertion():
         session_data_set = session.execute_query_statement(
             "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
         )
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -104,7 +104,7 @@ def test_nullable_tablet_insertion():
         session_data_set = session.execute_query_statement(
             "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_01"
         )
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
diff --git a/client-py/tests/test_todf.py b/client-py/tests/test_todf.py
index 07953446cf..03fe2e76a5 100644
--- a/client-py/tests/test_todf.py
+++ b/client-py/tests/test_todf.py
@@ -94,7 +94,7 @@ def test_simple_query():
         df_input.insert(0, "Time", timestamps)
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -174,7 +174,7 @@ def test_with_null_query():
         df_input.insert(0, "Time", timestamps)
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()
@@ -212,7 +212,7 @@ def test_multi_fetch():
 
         session_data_set = session.execute_query_statement("SELECT ** FROM root")
         session_data_set.set_fetch_size(100)
-        df_output = session_data_set.todf()
+        df_output = session_data_set.to_df()
         df_output = df_output[df_input.columns.tolist()]
 
         session.close()


[iotdb] 02/03: support shuffle

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit db3eab0588888c61626efe9faac5f743b66813ec
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Nov 14 20:31:48 2022 +0800

    support shuffle
---
 client-py/SessionExample.py                        |  2 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    | 12 ++++---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  8 +++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 38 +++++++++++++++++-----
 4 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 4f04855def..e50521ad18 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -45,7 +45,7 @@ fetch_args = {
     "end_time": 32,
     "interval": 4,
     "sliding_step": 1,
-    "indexes": [0, 3, 5, 9]
+    "indexes": [9, 0, 5, 3]
 }
 
 print(session.fetch_window_batch(ts_path_list, None, fetch_args))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 3d3c02b2e8..14496b69cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * This visitor is used to generate a logical plan for the statement and returns the {@link
@@ -294,16 +295,17 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitFetchWindowBatch(
       FetchWindowBatchStatement fetchWindowBatchStatement, MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+    List<Integer> sortedSamplingIndexes =
+        fetchWindowBatchStatement.getSamplingIndexes().stream()
+            .sorted()
+            .collect(Collectors.toList());
     planBuilder
         .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
         .planTransform(
             analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
-        .planWindowSplit(
-            fetchWindowBatchStatement.getGroupByTimeParameter(),
-            fetchWindowBatchStatement.getSamplingIndexes())
+        .planWindowSplit(fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes)
         .planWindowConcat(
-            fetchWindowBatchStatement.getGroupByTimeParameter(),
-            fetchWindowBatchStatement.getSamplingIndexes());
+            fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes);
 
     return planBuilder.getRoot();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index ab027fb138..814fd22deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -459,7 +460,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
         TSFetchWindowBatchResp resp =
             createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
         resp.setWindowBatchDataSetList(
-            QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(queryExecution));
+            QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(
+                queryExecution, ((FetchWindowBatchStatement) s).getSamplingIndexes()));
         return resp;
       }
     } catch (Exception e) {
@@ -514,7 +516,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
         TSFetchWindowBatchResp resp =
             createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
-        resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+        resp.setWindowBatch(
+            QueryDataSetUtils.convertTsBlocksToWindowBatch(
+                queryExecution, ((FetchWindowBatchStatement) s).getSamplingIndexes()));
         return resp;
       }
     } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 8880ca2810..b0d26a7b0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
@@ -400,9 +401,9 @@ public class QueryDataSetUtils {
     return res;
   }
 
-  public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(IQueryExecution queryExecution)
-      throws IoTDBException {
-    List<List<ByteBuffer>> windowSet = new ArrayList<>();
+  public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(
+      IQueryExecution queryExecution, List<Integer> samplingIndexes) throws IoTDBException {
+    List<List<ByteBuffer>> sortedWindowBatch = new ArrayList<>();
 
     while (true) {
       Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
@@ -423,14 +424,24 @@ public class QueryDataSetUtils {
         res.add(byteBuffer);
       }
 
-      windowSet.add(res);
+      sortedWindowBatch.add(res);
     }
-    return windowSet;
+
+    List<List<ByteBuffer>> windowBatch = new ArrayList<>(sortedWindowBatch.size());
+    List<Integer> sortedSamplingIndexes =
+        samplingIndexes.stream().sorted().collect(Collectors.toList());
+
+    for (Integer samplingIndex : samplingIndexes) {
+      int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex);
+      windowBatch.add(sortedWindowBatch.get(mapIndex));
+    }
+    return windowBatch;
   }
 
   public static List<TSQueryDataSet> convertTsBlocksToWindowBatchDataSetList(
-      IQueryExecution queryExecution) throws IoTDBException, IOException {
-    List<TSQueryDataSet> windowSet = new ArrayList<>();
+      IQueryExecution queryExecution, List<Integer> samplingIndexes)
+      throws IoTDBException, IOException {
+    List<TSQueryDataSet> sortedWindowBatch = new ArrayList<>();
 
     int columnNum = queryExecution.getOutputValueColumnCount();
     // one time column and each value column has an actual value buffer and a bitmap value to
@@ -626,9 +637,18 @@ public class QueryDataSetUtils {
       tsQueryDataSet.setBitmapList(bitmapList);
       tsQueryDataSet.setValueList(valueList);
 
-      windowSet.add(tsQueryDataSet);
+      sortedWindowBatch.add(tsQueryDataSet);
+    }
+
+    List<TSQueryDataSet> windowBatch = new ArrayList<>(sortedWindowBatch.size());
+    List<Integer> sortedSamplingIndexes =
+        samplingIndexes.stream().sorted().collect(Collectors.toList());
+
+    for (Integer samplingIndex : samplingIndexes) {
+      int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex);
+      windowBatch.add(sortedWindowBatch.get(mapIndex));
     }
-    return windowSet;
+    return windowBatch;
   }
 
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {