You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/03/09 03:23:36 UTC

[iotdb] branch master updated: [IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 270fcc3  [IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)
270fcc3 is described below

commit 270fcc33aba917361ad61b2b1d08e8f96e5ddeb1
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Mar 9 11:22:51 2022 +0800

    [IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)
---
 client-py/SessionExample.py                      |  26 ++-
 client-py/SessionTest.py                         |  23 ++-
 client-py/iotdb/utils/NumpyTablet.py             | 107 ++++++++++
 client-py/iotdb/utils/Tablet.py                  | 244 ++++++++++-------------
 client-py/requirements.txt                       |   5 +-
 client-py/tests/tablet_performance_comparison.py |  13 +-
 6 files changed, 270 insertions(+), 148 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index e4f42d1..21a1702 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -17,10 +17,12 @@
 #
 
 # Uncomment the following line to use apache-iotdb module installed by pip3
+import numpy as np
 
 from iotdb.Session import Session
 from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
 from iotdb.utils.Tablet import Tablet
+from iotdb.utils.NumpyTablet import NumpyTablet
 
 # creating session connection.
 ip = "127.0.0.1"
@@ -50,7 +52,7 @@ session.create_time_series(
 )
 session.create_time_series(
     "root.sg_test_01.d_02.s_01",
-    TSDataType.INT64,
+    TSDataType.BOOLEAN,
     TSEncoding.PLAIN,
     Compressor.SNAPPY,
     None,
@@ -174,6 +176,21 @@ tablet_ = Tablet(
 )
 session.insert_tablet(tablet_)
 
+# insert one numpy tablet into the database.
+np_values_ = [
+    np.array([False, True, False, True], np.dtype('>?')),
+    np.array([10, 100, 100, 0], np.dtype('>i4')),
+    np.array([11, 11111, 1, 0], np.dtype('>i8')),
+    np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
+    np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
+    ["test01", "test02", "test03", "test04"],
+]
+np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
+np_tablet_ = NumpyTablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+)
+session.insert_tablet(np_tablet_)
+
 # insert multiple tablets into database
 tablet_01 = Tablet(
     "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
@@ -226,6 +243,13 @@ with session.execute_query_statement(
     session_data_set.set_fetch_size(1024)
     while session_data_set.has_next():
         print(session_data_set.next())
+# execute sql query statement
+with session.execute_query_statement(
+    "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
+) as session_data_set:
+    session_data_set.set_fetch_size(1024)
+    while session_data_set.has_next():
+        print(session_data_set.next())
 
 # close session connection.
 session.close()
diff --git a/client-py/SessionTest.py b/client-py/SessionTest.py
index e96d4dd..5435df3 100644
--- a/client-py/SessionTest.py
+++ b/client-py/SessionTest.py
@@ -17,8 +17,11 @@
 #
 
 # Uncomment the following line to use apache-iotdb module installed by pip3
+import numpy as np
+
 from iotdb.Session import Session
 from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.NumpyTablet import NumpyTablet
 from iotdb.utils.Tablet import Tablet
 
 # whether the test has passed
@@ -77,7 +80,7 @@ session.create_time_series(
 )
 session.create_time_series(
     "root.sg_test_01.d_02.s_01",
-    TSDataType.INT64,
+    TSDataType.BOOLEAN,
     TSEncoding.PLAIN,
     Compressor.SNAPPY,
     None,
@@ -216,10 +219,28 @@ timestamps_ = [4, 5, 6, 7]
 tablet_ = Tablet(
     "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
 )
+
 if session.insert_tablet(tablet_) < 0:
     test_fail()
     print_message("insert tablet failed")
 
+# insert one numpy tablet into the database.
+np_values_ = [
+    np.array([False, True, False, True], np.dtype('>?')),
+    np.array([10, 100, 100, 0], np.dtype('>i4')),
+    np.array([11, 11111, 1, 0], np.dtype('>i8')),
+    np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
+    np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
+    ["test01", "test02", "test03", "test04"],
+]
+np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
+np_tablet_ = NumpyTablet(
+    "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+)
+if session.insert_tablet(np_tablet_) < 0:
+    test_fail()
+    print_message("insert numpy tablet failed")
+
 # insert multiple tablets into database
 tablet_01 = Tablet(
     "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py
new file mode 100644
index 0000000..72a83fe
--- /dev/null
+++ b/client-py/iotdb/utils/NumpyTablet.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct
+
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.BitMap import BitMap
+
+
+class NumpyTablet(object):
+    def __init__(
+        self, device_id, measurements, data_types, values, timestamps
+    ):
+        """
+        creating a numpy tablet for insertion
+          for example, considering device: root.sg1.d1
+            timestamps,     m1,    m2,     m3
+                     1,  125.3,  True,  text1
+                     2,  111.6, False,  text2
+                     3,  688.6,  True,  text3
+        Notice: From 0.13.0, the tablet can contain empty cell
+                The tablet will be sorted at the initialization by timestamps
+        :param device_id: String, IoTDB time series path to device layer (without sensor)
+        :param measurements: List, sensors
+        :param data_types: TSDataType List, specify value types for sensors
+        :param values: List of numpy array, the values of each column should be the inner numpy array
+        :param timestamps: Numpy array, the timestamps
+        """
+        self.__values = values
+        self.__timestamps = timestamps
+        self.__device_id = device_id
+        self.__measurements = measurements
+        self.__data_types = data_types
+        self.__row_number = len(timestamps)
+        self.__column_number = len(measurements)
+
+    @staticmethod
+    def check_sorted(timestamps):
+        for i in range(1, len(timestamps)):
+            if timestamps[i] < timestamps[i - 1]:
+                return False
+        return True
+
+    def get_measurements(self):
+        return self.__measurements
+
+    def get_data_types(self):
+        return self.__data_types
+
+    def get_row_number(self):
+        return self.__row_number
+
+    def get_device_id(self):
+        return self.__device_id
+
+    def get_binary_timestamps(self):
+        return self.__timestamps.tobytes()
+
+    def get_binary_values(self):
+        bs_len = 0
+        bs_list = []
+        for i, value in enumerate(self.__values):
+            if self.__data_types[i] == TSDataType.TEXT:
+                format_str_list = [">"]
+                values_tobe_packed = []
+                for str_list in value:
+                    # Fot TEXT, it's same as the original solution
+                    value_bytes = bytes(str_list, "utf-8")
+                    format_str_list.append("i")
+                    format_str_list.append(str(len(value_bytes)))
+                    format_str_list.append("s")
+                    values_tobe_packed.append(len(value_bytes))
+                    values_tobe_packed.append(value_bytes)
+                format_str = "".join(format_str_list)
+                bs = struct.pack(format_str, *values_tobe_packed)
+            else:
+                bs = value.tobytes()
+            bs_list.append(bs)
+            bs_len += len(bs)
+        ret = memoryview(bytearray(bs_len))
+        offset = 0
+        for bs in bs_list:
+            _l = len(bs)
+            ret[offset : offset + _l] = bs
+            offset += _l
+        return ret
+
+    def __mark_none_value(self, bitmaps, bitmap, column, row):
+        if bitmap is None:
+            bitmap = BitMap(self.__row_number)
+            bitmaps.insert(column, bitmap)
+        bitmap.mark(row)
diff --git a/client-py/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py
index dc75546..bf64118 100644
--- a/client-py/iotdb/utils/Tablet.py
+++ b/client-py/iotdb/utils/Tablet.py
@@ -24,7 +24,7 @@ from iotdb.utils.BitMap import BitMap
 
 class Tablet(object):
     def __init__(
-        self, device_id, measurements, data_types, values, timestamps, use_new=False
+        self, device_id, measurements, data_types, values, timestamps
     ):
         """
         creating a tablet for insertion
@@ -35,14 +35,13 @@ class Tablet(object):
                      3,  688.6,  True,  text3
         Notice: From 0.13.0, the tablet can contain empty cell
                 The tablet will be sorted at the initialization by timestamps
-
-        :param device_id: String, IoTDB time series path to device layer (without sensor).
-        :param measurements: List, sensors.
-        :param data_types: TSDataType List, specify value types for sensors.
-        :param values: 2-D List, the values of each row should be the outer list element.
-        :param timestamps: List.
+        :param device_id: String, IoTDB time series path to device layer (without sensor)
+        :param measurements: List, sensors
+        :param data_types: TSDataType List, specify value types for sensors
+        :param values: 2-D List, the values of each row should be the outer list element
+        :param timestamps: List,
         """
-        if not use_new and len(timestamps) != len(values):
+        if len(timestamps) != len(values):
             raise RuntimeError(
                 "Input error! len(timestamps) does not equal to len(values)!"
             )
@@ -60,7 +59,6 @@ class Tablet(object):
         self.__data_types = data_types
         self.__row_number = len(timestamps)
         self.__column_number = len(measurements)
-        self.__use_new = use_new
 
     @staticmethod
     def check_sorted(timestamps):
@@ -82,147 +80,115 @@ class Tablet(object):
         return self.__device_id
 
     def get_binary_timestamps(self):
-        if not self.__use_new:
-            format_str_list = [">"]
-            values_tobe_packed = []
-            for timestamp in self.__timestamps:
-                format_str_list.append("q")
-                values_tobe_packed.append(timestamp)
+        format_str_list = [">"]
+        values_tobe_packed = []
+        for timestamp in self.__timestamps:
+            format_str_list.append("q")
+            values_tobe_packed.append(timestamp)
 
-            format_str = "".join(format_str_list)
-            return struct.pack(format_str, *values_tobe_packed)
-        else:
-            return self.__timestamps.tobytes()
+        format_str = "".join(format_str_list)
+        return struct.pack(format_str, *values_tobe_packed)
 
     def get_binary_values(self):
-        if not self.__use_new:
-            format_str_list = [">"]
-            values_tobe_packed = []
-            bitmaps = []
-            has_none = False
-            for i in range(self.__column_number):
-                bitmap = None
-                bitmaps.insert(i, bitmap)
-                if self.__data_types[i] == TSDataType.BOOLEAN:
-                    format_str_list.append(str(self.__row_number))
-                    format_str_list.append("?")
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            values_tobe_packed.append(self.__values[j][i])
-                        else:
-                            values_tobe_packed.append(False)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                elif self.__data_types[i] == TSDataType.INT32:
-                    format_str_list.append(str(self.__row_number))
-                    format_str_list.append("i")
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            values_tobe_packed.append(self.__values[j][i])
-                        else:
-                            values_tobe_packed.append(0)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                elif self.__data_types[i] == TSDataType.INT64:
-                    format_str_list.append(str(self.__row_number))
-                    format_str_list.append("q")
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            values_tobe_packed.append(self.__values[j][i])
-                        else:
-                            values_tobe_packed.append(0)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                elif self.__data_types[i] == TSDataType.FLOAT:
-                    format_str_list.append(str(self.__row_number))
-                    format_str_list.append("f")
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            values_tobe_packed.append(self.__values[j][i])
-                        else:
-                            values_tobe_packed.append(0)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                elif self.__data_types[i] == TSDataType.DOUBLE:
-                    format_str_list.append(str(self.__row_number))
-                    format_str_list.append("d")
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            values_tobe_packed.append(self.__values[j][i])
-                        else:
-                            values_tobe_packed.append(0)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                elif self.__data_types[i] == TSDataType.TEXT:
-                    for j in range(self.__row_number):
-                        if self.__values[j][i] is not None:
-                            value_bytes = bytes(self.__values[j][i], "utf-8")
-                            format_str_list.append("i")
-                            format_str_list.append(str(len(value_bytes)))
-                            format_str_list.append("s")
-                            values_tobe_packed.append(len(value_bytes))
-                            values_tobe_packed.append(value_bytes)
-                        else:
-                            value_bytes = bytes("", "utf-8")
-                            format_str_list.append("i")
-                            format_str_list.append(str(len(value_bytes)))
-                            format_str_list.append("s")
-                            values_tobe_packed.append(len(value_bytes))
-                            values_tobe_packed.append(value_bytes)
-                            self.__mark_none_value(bitmaps, bitmap, i, j)
-                            has_none = True
-
-                else:
-                    raise RuntimeError(
-                        "Unsupported data type:" + str(self.__data_types[i])
-                    )
-
-            if has_none:
-                for i in range(self.__column_number):
-                    format_str_list.append("?")
-                    if bitmaps[i] is None:
+        format_str_list = [">"]
+        values_tobe_packed = []
+        bitmaps = []
+        has_none = False
+        for i in range(self.__column_number):
+            bitmap = None
+            bitmaps.insert(i, bitmap)
+            if self.__data_types[i] == TSDataType.BOOLEAN:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("?")
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        values_tobe_packed.append(self.__values[j][i])
+                    else:
                         values_tobe_packed.append(False)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            elif self.__data_types[i] == TSDataType.INT32:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("i")
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        values_tobe_packed.append(self.__values[j][i])
                     else:
-                        values_tobe_packed.append(True)
-                        format_str_list.append(str(self.__row_number // 8 + 1))
-                        format_str_list.append("c")
-                        for j in range(self.__row_number // 8 + 1):
-                            values_tobe_packed.append(bytes([bitmaps[i].bits[j]]))
-            format_str = "".join(format_str_list)
-            return struct.pack(format_str, *values_tobe_packed)
-        else:
-            bs_len = 0
-            bs_list = []
-            for i, value in enumerate(self.__values):
-                if self.__data_types[i] == TSDataType.TEXT:
-                    format_str_list = [">"]
-                    values_tobe_packed = []
-                    for str_list in value:
-                        # Fot TEXT, it's same as the original solution
-                        value_bytes = bytes(str_list, "utf-8")
+                        values_tobe_packed.append(0)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            elif self.__data_types[i] == TSDataType.INT64:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("q")
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        values_tobe_packed.append(self.__values[j][i])
+                    else:
+                        values_tobe_packed.append(0)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            elif self.__data_types[i] == TSDataType.FLOAT:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("f")
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        values_tobe_packed.append(self.__values[j][i])
+                    else:
+                        values_tobe_packed.append(0)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            elif self.__data_types[i] == TSDataType.DOUBLE:
+                format_str_list.append(str(self.__row_number))
+                format_str_list.append("d")
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        values_tobe_packed.append(self.__values[j][i])
+                    else:
+                        values_tobe_packed.append(0)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            elif self.__data_types[i] == TSDataType.TEXT:
+                for j in range(self.__row_number):
+                    if self.__values[j][i] is not None:
+                        value_bytes = bytes(self.__values[j][i], "utf-8")
                         format_str_list.append("i")
                         format_str_list.append(str(len(value_bytes)))
                         format_str_list.append("s")
                         values_tobe_packed.append(len(value_bytes))
                         values_tobe_packed.append(value_bytes)
-                    format_str = "".join(format_str_list)
-                    bs = struct.pack(format_str, *values_tobe_packed)
+                    else:
+                        value_bytes = bytes("", "utf-8")
+                        format_str_list.append("i")
+                        format_str_list.append(str(len(value_bytes)))
+                        format_str_list.append("s")
+                        values_tobe_packed.append(len(value_bytes))
+                        values_tobe_packed.append(value_bytes)
+                        self.__mark_none_value(bitmaps, bitmap, i, j)
+                        has_none = True
+
+            else:
+                raise RuntimeError(
+                    "Unsupported data type:" + str(self.__data_types[i])
+                )
+
+        if has_none:
+            for i in range(self.__column_number):
+                format_str_list.append("?")
+                if bitmaps[i] is None:
+                    values_tobe_packed.append(False)
                 else:
-                    bs = value.tobytes()
-                bs_list.append(bs)
-                bs_len += len(bs)
-            ret = memoryview(bytearray(bs_len))
-            offset = 0
-            for bs in bs_list:
-                _l = len(bs)
-                ret[offset : offset + _l] = bs
-                offset += _l
-            return ret
+                    values_tobe_packed.append(True)
+                    format_str_list.append(str(self.__row_number // 8 + 1))
+                    format_str_list.append("c")
+                    for j in range(self.__row_number // 8 + 1):
+                        values_tobe_packed.append(bytes([bitmaps[i].bits[j]]))
+        format_str = "".join(format_str_list)
+        return struct.pack(format_str, *values_tobe_packed)
 
     def __mark_none_value(self, bitmaps, bitmap, column, row):
         if bitmap is None:
diff --git a/client-py/requirements.txt b/client-py/requirements.txt
index 55839d9..566c75f 100644
--- a/client-py/requirements.txt
+++ b/client-py/requirements.txt
@@ -17,6 +17,7 @@
 #
 
 # Pandas Export
-pandas==1.2.3
+pandas~=1.3.5
 # Testcontainer
-testcontainers==3.3.0
\ No newline at end of file
+testcontainers==3.3.0
+numpy~=1.21.4
\ No newline at end of file
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
index 4cbbe53..76d26f8 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -24,6 +24,7 @@ import pandas as pd
 
 from iotdb.Session import Session
 from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
 from iotdb.utils.Tablet import Tablet
 
 # the data type specified the byte order (i.e. endian)
@@ -207,8 +208,11 @@ def performance_test(
                 for m in measurements:
                     value_array.append(csv_data.at[t, m])
                 values.append(value_array)
+            tablet = Tablet(
+                device_id, measurements, data_types, values, timestamps_
+            )
         else:
-            # Use the NEW method to construct tablet
+            # Use the NEW method to construct numpy tablet
             timestamps_ = csv_data[TIME_STR].values
             if timestamps_.dtype != FORMAT_CHAR_OF_TYPES[TSDataType.INT64]:
                 timestamps_ = timestamps_.astype(FORMAT_CHAR_OF_TYPES[TSDataType.INT64])
@@ -220,10 +224,9 @@ def performance_test(
                     if not (tstype == TSDataType.TEXT and value_array.dtype == object):
                         value_array = value_array.astype(type_char)
                 values.append(value_array)
-
-        tablet = Tablet(
-            device_id, measurements, data_types, values, timestamps_, use_new=use_new
-        )
+            tablet = NumpyTablet(
+                device_id, measurements, data_types, values, timestamps_
+            )
         cost_st = time.perf_counter()
         session.insert_tablet(tablet)
         insert_cost += time.perf_counter() - cost_st