You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/03/09 03:23:36 UTC
[iotdb] branch master updated: [IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 270fcc3 [IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)
270fcc3 is described below
commit 270fcc33aba917361ad61b2b1d08e8f96e5ddeb1
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Mar 9 11:22:51 2022 +0800
[IOTDB-2703] Support insert ndarray Tablet in Python API (#5185)
---
client-py/SessionExample.py | 26 ++-
client-py/SessionTest.py | 23 ++-
client-py/iotdb/utils/NumpyTablet.py | 107 ++++++++++
client-py/iotdb/utils/Tablet.py | 244 ++++++++++-------------
client-py/requirements.txt | 5 +-
client-py/tests/tablet_performance_comparison.py | 13 +-
6 files changed, 270 insertions(+), 148 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index e4f42d1..21a1702 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -17,10 +17,12 @@
#
# Uncomment the following line to use apache-iotdb module installed by pip3
+import numpy as np
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
+from iotdb.utils.NumpyTablet import NumpyTablet
# creating session connection.
ip = "127.0.0.1"
@@ -50,7 +52,7 @@ session.create_time_series(
)
session.create_time_series(
"root.sg_test_01.d_02.s_01",
- TSDataType.INT64,
+ TSDataType.BOOLEAN,
TSEncoding.PLAIN,
Compressor.SNAPPY,
None,
@@ -174,6 +176,21 @@ tablet_ = Tablet(
)
session.insert_tablet(tablet_)
+# insert one numpy tablet into the database.
+np_values_ = [
+ np.array([False, True, False, True], np.dtype('>?')),
+ np.array([10, 100, 100, 0], np.dtype('>i4')),
+ np.array([11, 11111, 1, 0], np.dtype('>i8')),
+ np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
+ np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
+ ["test01", "test02", "test03", "test04"],
+]
+np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
+np_tablet_ = NumpyTablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+)
+session.insert_tablet(np_tablet_)
+
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
@@ -226,6 +243,13 @@ with session.execute_query_statement(
session_data_set.set_fetch_size(1024)
while session_data_set.has_next():
print(session_data_set.next())
+# execute sql query statement
+with session.execute_query_statement(
+ "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02"
+) as session_data_set:
+ session_data_set.set_fetch_size(1024)
+ while session_data_set.has_next():
+ print(session_data_set.next())
# close session connection.
session.close()
diff --git a/client-py/SessionTest.py b/client-py/SessionTest.py
index e96d4dd..5435df3 100644
--- a/client-py/SessionTest.py
+++ b/client-py/SessionTest.py
@@ -17,8 +17,11 @@
#
# Uncomment the following line to use apache-iotdb module installed by pip3
+import numpy as np
+
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.Tablet import Tablet
# whether the test has passed
@@ -77,7 +80,7 @@ session.create_time_series(
)
session.create_time_series(
"root.sg_test_01.d_02.s_01",
- TSDataType.INT64,
+ TSDataType.BOOLEAN,
TSEncoding.PLAIN,
Compressor.SNAPPY,
None,
@@ -216,10 +219,28 @@ timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
+
if session.insert_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet failed")
+# insert one numpy tablet into the database.
+np_values_ = [
+ np.array([False, True, False, True], np.dtype('>?')),
+ np.array([10, 100, 100, 0], np.dtype('>i4')),
+ np.array([11, 11111, 1, 0], np.dtype('>i8')),
+ np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
+ np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
+ ["test01", "test02", "test03", "test04"],
+]
+np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
+np_tablet_ = NumpyTablet(
+ "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
+)
+if session.insert_tablet(np_tablet_) < 0:
+ test_fail()
+ print_message("insert numpy tablet failed")
+
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py
new file mode 100644
index 0000000..72a83fe
--- /dev/null
+++ b/client-py/iotdb/utils/NumpyTablet.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct
+
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.BitMap import BitMap
+
+
+class NumpyTablet(object):
+ def __init__(
+ self, device_id, measurements, data_types, values, timestamps
+ ):
+ """
+ creating a numpy tablet for insertion
+ for example, considering device: root.sg1.d1
+ timestamps, m1, m2, m3
+ 1, 125.3, True, text1
+ 2, 111.6, False, text2
+ 3, 688.6, True, text3
+ Notice: From 0.13.0, the tablet can contain empty cell
+ The tablet will be sorted at the initialization by timestamps
+ :param device_id: String, IoTDB time series path to device layer (without sensor)
+ :param measurements: List, sensors
+ :param data_types: TSDataType List, specify value types for sensors
+ :param values: List of numpy array, the values of each column should be the inner numpy array
+ :param timestamps: Numpy array, the timestamps
+ """
+ self.__values = values
+ self.__timestamps = timestamps
+ self.__device_id = device_id
+ self.__measurements = measurements
+ self.__data_types = data_types
+ self.__row_number = len(timestamps)
+ self.__column_number = len(measurements)
+
+ @staticmethod
+ def check_sorted(timestamps):
+ for i in range(1, len(timestamps)):
+ if timestamps[i] < timestamps[i - 1]:
+ return False
+ return True
+
+ def get_measurements(self):
+ return self.__measurements
+
+ def get_data_types(self):
+ return self.__data_types
+
+ def get_row_number(self):
+ return self.__row_number
+
+ def get_device_id(self):
+ return self.__device_id
+
+ def get_binary_timestamps(self):
+ return self.__timestamps.tobytes()
+
+ def get_binary_values(self):
+ bs_len = 0
+ bs_list = []
+ for i, value in enumerate(self.__values):
+ if self.__data_types[i] == TSDataType.TEXT:
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for str_list in value:
+ # Fot TEXT, it's same as the original solution
+ value_bytes = bytes(str_list, "utf-8")
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ format_str = "".join(format_str_list)
+ bs = struct.pack(format_str, *values_tobe_packed)
+ else:
+ bs = value.tobytes()
+ bs_list.append(bs)
+ bs_len += len(bs)
+ ret = memoryview(bytearray(bs_len))
+ offset = 0
+ for bs in bs_list:
+ _l = len(bs)
+ ret[offset : offset + _l] = bs
+ offset += _l
+ return ret
+
+ def __mark_none_value(self, bitmaps, bitmap, column, row):
+ if bitmap is None:
+ bitmap = BitMap(self.__row_number)
+ bitmaps.insert(column, bitmap)
+ bitmap.mark(row)
diff --git a/client-py/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py
index dc75546..bf64118 100644
--- a/client-py/iotdb/utils/Tablet.py
+++ b/client-py/iotdb/utils/Tablet.py
@@ -24,7 +24,7 @@ from iotdb.utils.BitMap import BitMap
class Tablet(object):
def __init__(
- self, device_id, measurements, data_types, values, timestamps, use_new=False
+ self, device_id, measurements, data_types, values, timestamps
):
"""
creating a tablet for insertion
@@ -35,14 +35,13 @@ class Tablet(object):
3, 688.6, True, text3
Notice: From 0.13.0, the tablet can contain empty cell
The tablet will be sorted at the initialization by timestamps
-
- :param device_id: String, IoTDB time series path to device layer (without sensor).
- :param measurements: List, sensors.
- :param data_types: TSDataType List, specify value types for sensors.
- :param values: 2-D List, the values of each row should be the outer list element.
- :param timestamps: List.
+ :param device_id: String, IoTDB time series path to device layer (without sensor)
+ :param measurements: List, sensors
+ :param data_types: TSDataType List, specify value types for sensors
+ :param values: 2-D List, the values of each row should be the outer list element
+ :param timestamps: List,
"""
- if not use_new and len(timestamps) != len(values):
+ if len(timestamps) != len(values):
raise RuntimeError(
"Input error! len(timestamps) does not equal to len(values)!"
)
@@ -60,7 +59,6 @@ class Tablet(object):
self.__data_types = data_types
self.__row_number = len(timestamps)
self.__column_number = len(measurements)
- self.__use_new = use_new
@staticmethod
def check_sorted(timestamps):
@@ -82,147 +80,115 @@ class Tablet(object):
return self.__device_id
def get_binary_timestamps(self):
- if not self.__use_new:
- format_str_list = [">"]
- values_tobe_packed = []
- for timestamp in self.__timestamps:
- format_str_list.append("q")
- values_tobe_packed.append(timestamp)
+ format_str_list = [">"]
+ values_tobe_packed = []
+ for timestamp in self.__timestamps:
+ format_str_list.append("q")
+ values_tobe_packed.append(timestamp)
- format_str = "".join(format_str_list)
- return struct.pack(format_str, *values_tobe_packed)
- else:
- return self.__timestamps.tobytes()
+ format_str = "".join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
def get_binary_values(self):
- if not self.__use_new:
- format_str_list = [">"]
- values_tobe_packed = []
- bitmaps = []
- has_none = False
- for i in range(self.__column_number):
- bitmap = None
- bitmaps.insert(i, bitmap)
- if self.__data_types[i] == TSDataType.BOOLEAN:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("?")
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- values_tobe_packed.append(self.__values[j][i])
- else:
- values_tobe_packed.append(False)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- elif self.__data_types[i] == TSDataType.INT32:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("i")
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- values_tobe_packed.append(self.__values[j][i])
- else:
- values_tobe_packed.append(0)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- elif self.__data_types[i] == TSDataType.INT64:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("q")
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- values_tobe_packed.append(self.__values[j][i])
- else:
- values_tobe_packed.append(0)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- elif self.__data_types[i] == TSDataType.FLOAT:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("f")
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- values_tobe_packed.append(self.__values[j][i])
- else:
- values_tobe_packed.append(0)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- elif self.__data_types[i] == TSDataType.DOUBLE:
- format_str_list.append(str(self.__row_number))
- format_str_list.append("d")
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- values_tobe_packed.append(self.__values[j][i])
- else:
- values_tobe_packed.append(0)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- elif self.__data_types[i] == TSDataType.TEXT:
- for j in range(self.__row_number):
- if self.__values[j][i] is not None:
- value_bytes = bytes(self.__values[j][i], "utf-8")
- format_str_list.append("i")
- format_str_list.append(str(len(value_bytes)))
- format_str_list.append("s")
- values_tobe_packed.append(len(value_bytes))
- values_tobe_packed.append(value_bytes)
- else:
- value_bytes = bytes("", "utf-8")
- format_str_list.append("i")
- format_str_list.append(str(len(value_bytes)))
- format_str_list.append("s")
- values_tobe_packed.append(len(value_bytes))
- values_tobe_packed.append(value_bytes)
- self.__mark_none_value(bitmaps, bitmap, i, j)
- has_none = True
-
- else:
- raise RuntimeError(
- "Unsupported data type:" + str(self.__data_types[i])
- )
-
- if has_none:
- for i in range(self.__column_number):
- format_str_list.append("?")
- if bitmaps[i] is None:
+ format_str_list = [">"]
+ values_tobe_packed = []
+ bitmaps = []
+ has_none = False
+ for i in range(self.__column_number):
+ bitmap = None
+ bitmaps.insert(i, bitmap)
+ if self.__data_types[i] == TSDataType.BOOLEAN:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("?")
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ values_tobe_packed.append(self.__values[j][i])
+ else:
values_tobe_packed.append(False)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ elif self.__data_types[i] == TSDataType.INT32:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("i")
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ values_tobe_packed.append(self.__values[j][i])
else:
- values_tobe_packed.append(True)
- format_str_list.append(str(self.__row_number // 8 + 1))
- format_str_list.append("c")
- for j in range(self.__row_number // 8 + 1):
- values_tobe_packed.append(bytes([bitmaps[i].bits[j]]))
- format_str = "".join(format_str_list)
- return struct.pack(format_str, *values_tobe_packed)
- else:
- bs_len = 0
- bs_list = []
- for i, value in enumerate(self.__values):
- if self.__data_types[i] == TSDataType.TEXT:
- format_str_list = [">"]
- values_tobe_packed = []
- for str_list in value:
- # Fot TEXT, it's same as the original solution
- value_bytes = bytes(str_list, "utf-8")
+ values_tobe_packed.append(0)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ elif self.__data_types[i] == TSDataType.INT64:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("q")
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ values_tobe_packed.append(self.__values[j][i])
+ else:
+ values_tobe_packed.append(0)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ elif self.__data_types[i] == TSDataType.FLOAT:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("f")
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ values_tobe_packed.append(self.__values[j][i])
+ else:
+ values_tobe_packed.append(0)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ elif self.__data_types[i] == TSDataType.DOUBLE:
+ format_str_list.append(str(self.__row_number))
+ format_str_list.append("d")
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ values_tobe_packed.append(self.__values[j][i])
+ else:
+ values_tobe_packed.append(0)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ elif self.__data_types[i] == TSDataType.TEXT:
+ for j in range(self.__row_number):
+ if self.__values[j][i] is not None:
+ value_bytes = bytes(self.__values[j][i], "utf-8")
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
- format_str = "".join(format_str_list)
- bs = struct.pack(format_str, *values_tobe_packed)
+ else:
+ value_bytes = bytes("", "utf-8")
+ format_str_list.append("i")
+ format_str_list.append(str(len(value_bytes)))
+ format_str_list.append("s")
+ values_tobe_packed.append(len(value_bytes))
+ values_tobe_packed.append(value_bytes)
+ self.__mark_none_value(bitmaps, bitmap, i, j)
+ has_none = True
+
+ else:
+ raise RuntimeError(
+ "Unsupported data type:" + str(self.__data_types[i])
+ )
+
+ if has_none:
+ for i in range(self.__column_number):
+ format_str_list.append("?")
+ if bitmaps[i] is None:
+ values_tobe_packed.append(False)
else:
- bs = value.tobytes()
- bs_list.append(bs)
- bs_len += len(bs)
- ret = memoryview(bytearray(bs_len))
- offset = 0
- for bs in bs_list:
- _l = len(bs)
- ret[offset : offset + _l] = bs
- offset += _l
- return ret
+ values_tobe_packed.append(True)
+ format_str_list.append(str(self.__row_number // 8 + 1))
+ format_str_list.append("c")
+ for j in range(self.__row_number // 8 + 1):
+ values_tobe_packed.append(bytes([bitmaps[i].bits[j]]))
+ format_str = "".join(format_str_list)
+ return struct.pack(format_str, *values_tobe_packed)
def __mark_none_value(self, bitmaps, bitmap, column, row):
if bitmap is None:
diff --git a/client-py/requirements.txt b/client-py/requirements.txt
index 55839d9..566c75f 100644
--- a/client-py/requirements.txt
+++ b/client-py/requirements.txt
@@ -17,6 +17,7 @@
#
# Pandas Export
-pandas==1.2.3
+pandas~=1.3.5
# Testcontainer
-testcontainers==3.3.0
\ No newline at end of file
+testcontainers==3.3.0
+numpy~=1.21.4
\ No newline at end of file
diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py
index 4cbbe53..76d26f8 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -24,6 +24,7 @@ import pandas as pd
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.Tablet import Tablet
# the data type specified the byte order (i.e. endian)
@@ -207,8 +208,11 @@ def performance_test(
for m in measurements:
value_array.append(csv_data.at[t, m])
values.append(value_array)
+ tablet = Tablet(
+ device_id, measurements, data_types, values, timestamps_
+ )
else:
- # Use the NEW method to construct tablet
+ # Use the NEW method to construct numpy tablet
timestamps_ = csv_data[TIME_STR].values
if timestamps_.dtype != FORMAT_CHAR_OF_TYPES[TSDataType.INT64]:
timestamps_ = timestamps_.astype(FORMAT_CHAR_OF_TYPES[TSDataType.INT64])
@@ -220,10 +224,9 @@ def performance_test(
if not (tstype == TSDataType.TEXT and value_array.dtype == object):
value_array = value_array.astype(type_char)
values.append(value_array)
-
- tablet = Tablet(
- device_id, measurements, data_types, values, timestamps_, use_new=use_new
- )
+ tablet = NumpyTablet(
+ device_id, measurements, data_types, values, timestamps_
+ )
cost_st = time.perf_counter()
session.insert_tablet(tablet)
insert_cost += time.perf_counter() - cost_st