You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/10 08:18:56 UTC
[iotdb] branch master updated: [IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 13c5ba0b73 [IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)
13c5ba0b73 is described below
commit 13c5ba0b7333c6fde9bf72e2f1fb63740cb41a41
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Mar 10 16:18:50 2023 +0800
[IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)
---
.github/workflows/iotdb-ml.yml | 60 ++++++
mlnode/iotdb/mlnode/serde.py | 439 +++++++++++++++++++++++++++++++++++++++++
mlnode/requirements.txt | 22 +++
mlnode/requirements_dev.txt | 21 ++
mlnode/test/test_serde.py | 129 ++++++++++++
5 files changed, 671 insertions(+)
diff --git a/.github/workflows/iotdb-ml.yml b/.github/workflows/iotdb-ml.yml
new file mode 100644
index 0000000000..e1fb4c0820
--- /dev/null
+++ b/.github/workflows/iotdb-ml.yml
@@ -0,0 +1,60 @@
+# This workflow is just for checking whether modifications works for the MLNode
+
+name: MLNode
+
+on:
+ push:
+ branches:
+ - master
+ - 'rel/*'
+ - "new_*"
+ paths-ignore:
+ - 'docs/**'
+ - 'site/**'
+ pull_request:
+ branches:
+ - master
+ - 'rel/*'
+ - "new_*"
+ paths-ignore:
+ - 'docs/**'
+ - 'site/**'
+ # allow manually run the action:
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+env:
+ MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+ unix:
+ strategy:
+ fail-fast: false
+ max-parallel: 20
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Cache Maven packages
+ uses: actions/cache@v3
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-
+ - name: Build IoTDB server distribution zip
+ run: mvn -B clean install -pl distribution -am -DskipTests
+ - name: Build IoTDB server docker image
+ run: |
+ docker build . -f docker/src/main/Dockerfile-1c1d -t "iotdb:dev"
+ docker images
+ - name: Install IoTDB mlnode requirements
+ run: pip3 install -r mlnode/requirements_dev.txt
+ - name: Build MLNode
+ run: mvn clean package -DskipUTs -pl mlnode -am && pip3 install poetry && cd mlnode && poetry build && pip install dist/apache_iotdb_mlnode-1.0.0-py3-none-any.whl --force-reinstall
+ - name: Integration test
+ shell: bash
+ run: |
+ pytest mlnode
diff --git a/mlnode/iotdb/mlnode/serde.py b/mlnode/iotdb/mlnode/serde.py
new file mode 100644
index 0000000000..4f491db606
--- /dev/null
+++ b/mlnode/iotdb/mlnode/serde.py
@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import numpy as np
+import pandas as pd
+from iotdb.utils.IoTDBConstants import TSDataType
+
+TIMESTAMP_STR = "Time"
+START_INDEX = 2
+
+
+def convert_to_df(name_list, type_list, name_index, binary_list):
+ column_name_list = [TIMESTAMP_STR]
+ column_type_list = [TSDataType.INT64]
+ column_ordinal_dict = {TIMESTAMP_STR: 1}
+
+ if name_index is not None:
+ column_type_deduplicated_list = [
+ None for _ in range(len(name_index))
+ ]
+ for i in range(len(name_list)):
+ name = name_list[i]
+ column_name_list.append(name)
+ column_type_list.append(TSDataType[type_list[i]])
+ if name not in column_ordinal_dict:
+ index = name_index[name]
+ column_ordinal_dict[name] = index + START_INDEX
+ column_type_deduplicated_list[index] = TSDataType[type_list[i]]
+ else:
+ index = START_INDEX
+ column_type_deduplicated_list = []
+ for i in range(len(name_list)):
+ name = name_list[i]
+ column_name_list.append(name)
+ column_type_list.append(TSDataType[type_list[i]])
+ if name not in column_ordinal_dict:
+ column_ordinal_dict[name] = index
+ index += 1
+ column_type_deduplicated_list.append(
+ TSDataType[type_list[i]]
+ )
+
+ binary_size = len(binary_list)
+ binary_index = 0
+ result = {}
+ for column_name in column_name_list:
+ result[column_name] = None
+
+ while binary_index < binary_size:
+ buffer = binary_list[binary_index]
+ binary_index += 1
+ time_column_values, column_values, null_indicators, position_count = deserialize(buffer)
+ time_array = np.frombuffer(
+ time_column_values, np.dtype(np.longlong).newbyteorder(">")
+ )
+ if time_array.dtype.byteorder == ">":
+ time_array = time_array.byteswap().newbyteorder("<")
+
+ if result[TIMESTAMP_STR] is None:
+ result[TIMESTAMP_STR] = time_array
+ else:
+ result[TIMESTAMP_STR] = np.concatenate(
+ (result[TIMESTAMP_STR], time_array), axis=0
+ )
+ total_length = len(time_array)
+
+ for i in range(len(column_values)):
+ column_name = column_name_list[i + 1]
+
+ location = (
+ column_ordinal_dict[column_name] - START_INDEX
+ )
+
+ if location < 0:
+ continue
+ data_type = column_type_deduplicated_list[location]
+ value_buffer = column_values[location]
+ value_buffer_len = len(value_buffer)
+
+ if data_type == TSDataType.DOUBLE:
+ data_array = np.frombuffer(
+ value_buffer, np.dtype(np.double).newbyteorder(">")
+ )
+ elif data_type == TSDataType.FLOAT:
+ data_array = np.frombuffer(
+ value_buffer, np.dtype(np.float32).newbyteorder(">")
+ )
+ elif data_type == TSDataType.BOOLEAN:
+ data_array = []
+ for index in range(len(value_buffer)):
+ data_array.append(value_buffer[index])
+ data_array = np.array(data_array).astype("bool")
+ elif data_type == TSDataType.INT32:
+ data_array = np.frombuffer(
+ value_buffer, np.dtype(np.int32).newbyteorder(">")
+ )
+ elif data_type == TSDataType.INT64:
+ data_array = np.frombuffer(
+ value_buffer, np.dtype(np.int64).newbyteorder(">")
+ )
+ elif data_type == TSDataType.TEXT:
+ index = 0
+ data_array = []
+ while index < value_buffer_len:
+ value_bytes = value_buffer[index]
+ value = value_bytes.decode("utf-8")
+ data_array.append(value)
+ index += 1
+ data_array = np.array(data_array, dtype=object)
+ else:
+ raise RuntimeError("unsupported data type {}.".format(data_type))
+
+ if data_array.dtype.byteorder == ">":
+ data_array = data_array.byteswap().newbyteorder("<")
+
+ null_indicator = null_indicators[location]
+ if len(data_array) < total_length or (data_type == TSDataType.BOOLEAN and null_indicator is not None):
+ if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
+ tmp_array = np.full(total_length, np.nan, np.float32)
+ elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
+ tmp_array = np.full(total_length, np.nan, data_array.dtype)
+ elif data_type == TSDataType.BOOLEAN:
+ tmp_array = np.full(total_length, np.nan, np.float32)
+ elif data_type == TSDataType.TEXT:
+ tmp_array = np.full(total_length, None, dtype=data_array.dtype)
+ else:
+ raise Exception("Unsupported dataType in deserialization")
+
+ if null_indicator is not None:
+ indexes = [not v for v in null_indicator]
+ if data_type == TSDataType.BOOLEAN:
+ tmp_array[indexes] = data_array[indexes]
+ else:
+ tmp_array[indexes] = data_array
+
+ if data_type == TSDataType.INT32:
+ tmp_array = pd.Series(tmp_array).astype("Int32")
+ elif data_type == TSDataType.INT64:
+ tmp_array = pd.Series(tmp_array).astype("Int64")
+ elif data_type == TSDataType.BOOLEAN:
+ tmp_array = pd.Series(tmp_array).astype("boolean")
+
+ data_array = tmp_array
+
+ if result[column_name] is None:
+ result[column_name] = data_array
+ else:
+ if isinstance(result[column_name], pd.Series):
+ if not isinstance(data_array, pd.Series):
+ if data_type == TSDataType.INT32:
+ data_array = pd.Series(data_array).astype("Int32")
+ elif data_type == TSDataType.INT64:
+ data_array = pd.Series(data_array).astype("Int64")
+ elif data_type == TSDataType.BOOLEAN:
+ data_array = pd.Series(data_array).astype("boolean")
+ else:
+ raise RuntimeError("Series Error")
+ result[column_name] = result[column_name].append(data_array)
+ else:
+ result[column_name] = np.concatenate(
+ (result[column_name], data_array), axis=0
+ )
+ for k, v in result.items():
+ if v is None:
+ result[k] = []
+ df = pd.DataFrame(result)
+ df = df.reset_index(drop=True)
+ return df
+
+
+# Serialized tsblock:
+# +-------------+---------------+---------+------------+-----------+----------+
+# | val col cnt | val col types | pos cnt | encodings | time col | val col |
+# +-------------+---------------+---------+------------+-----------+----------+
+# | int32 | list[byte] | int32 | list[byte] | bytes | byte |
+# +-------------+---------------+---------+------------+-----------+----------+
+
+def deserialize(buffer):
+ value_column_count, buffer = read_int_from_buffer(buffer)
+ data_types, buffer = read_column_types(buffer, value_column_count)
+
+ position_count, buffer = read_int_from_buffer(buffer)
+ column_encodings, buffer = read_column_encoding(buffer, value_column_count + 1)
+
+ time_column_values, buffer = read_time_column(buffer, position_count)
+ column_values = [None] * value_column_count
+ null_indicators = [None] * value_column_count
+ for i in range(value_column_count):
+ column_value, nullIndicator, buffer = read_column(column_encodings[i + 1], buffer, data_types[i], position_count)
+ column_values[i] = column_value
+ null_indicators[i] = nullIndicator
+
+ return time_column_values, column_values, null_indicators, position_count
+
+
+# General Methods
+
+def read_int_from_buffer(buffer):
+ res, buffer = read_from_buffer(buffer, 4)
+ return int.from_bytes(res, "big"), buffer
+
+
+def read_byte_from_buffer(buffer):
+ return read_from_buffer(buffer, 1)
+
+
+def read_from_buffer(buffer, size):
+ res = buffer[:size]
+ buffer = buffer[size:]
+ return res, buffer
+
+
+# Read ColumnType
+
+def read_column_types(buffer, value_column_count):
+ data_types = []
+ for i in range(value_column_count):
+ res, buffer = read_byte_from_buffer(buffer)
+ data_types.append(get_dataType(res))
+ return data_types, buffer
+
+
+def get_dataType(value):
+ if value == b'\x00':
+ return TSDataType.BOOLEAN
+ elif value == b'\x01':
+ return TSDataType.INT32
+ elif value == b'\x02':
+ return TSDataType.INT64
+ elif value == b'\x03':
+ return TSDataType.FLOAT
+ elif value == b'\x04':
+ return TSDataType.DOUBLE
+ elif value == b'\x05':
+ return TSDataType.TEXT
+ elif value == b'\x06':
+ return TSDataType.VECTOR
+
+
+# Read ColumnEncodings
+
+def read_column_encoding(buffer, size):
+ encodings = []
+ for i in range(size):
+ res, buffer = read_byte_from_buffer(buffer)
+ encodings.append(res)
+ return encodings, buffer
+
+
+# Read Column
+
+def deserialize_null_indicators(buffer, size):
+ mayHaveNull, buffer = read_byte_from_buffer(buffer)
+ if mayHaveNull != b'\x00':
+ return deserialize_from_boolean_array(buffer, size)
+ return None, buffer
+
+
+# Serialized data layout:
+# +---------------+-----------------+-------------+
+# | may have null | null indicators | values |
+# +---------------+-----------------+-------------+
+# | byte | list[byte] | list[int64] |
+# +---------------+-----------------+-------------+
+
+def read_time_column(buffer, size):
+ nullIndicators, buffer = deserialize_null_indicators(buffer, size)
+ if nullIndicators is None:
+ values, buffer = read_from_buffer(
+ buffer, size * 8
+ )
+ else:
+ raise Exception("TimeColumn should not contains null value")
+ return values, buffer
+
+
+def read_INT64_column(buffer, data_type, position_count):
+ nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+ if nullIndicators is None:
+ size = position_count
+ else:
+ size = nullIndicators.count(False)
+
+ if TSDataType.INT64 == data_type or TSDataType.DOUBLE == data_type:
+ values, buffer = read_from_buffer(buffer, size * 8)
+ return values, nullIndicators, buffer
+ else:
+ raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+# +---------------+-----------------+-------------+
+# | may have null | null indicators | values |
+# +---------------+-----------------+-------------+
+# | byte | list[byte] | list[int32] |
+# +---------------+-----------------+-------------+
+
+def read_Int32_column(buffer, data_type, position_count):
+ nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+ if nullIndicators is None:
+ size = position_count
+ else:
+ size = nullIndicators.count(False)
+
+ if TSDataType.INT32 == data_type or TSDataType.FLOAT == data_type:
+ values, buffer = read_from_buffer(buffer, size * 4)
+ return values, nullIndicators, buffer
+ else:
+ raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+# +---------------+-----------------+-------------+
+# | may have null | null indicators | values |
+# +---------------+-----------------+-------------+
+# | byte | list[byte] | list[byte] |
+# +---------------+-----------------+-------------+
+
+def read_byte_column(buffer, data_type, position_count):
+ if data_type != TSDataType.BOOLEAN:
+ raise Exception("Invalid data type: " + data_type)
+ nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+ res, buffer = deserialize_from_boolean_array(buffer, position_count)
+ return res, nullIndicators, buffer
+
+
+def deserialize_from_boolean_array(buffer, size):
+ packed_boolean_array, buffer = read_from_buffer(buffer, (size + 7) // 8)
+ current_byte = 0
+ output = [None] * size
+ position = 0
+ # read null bits 8 at a time
+ while position < (size & ~0b111):
+ value = packed_boolean_array[current_byte]
+ output[position] = ((value & 0b1000_0000) != 0)
+ output[position + 1] = ((value & 0b0100_0000) != 0)
+ output[position + 2] = ((value & 0b0010_0000) != 0)
+ output[position + 3] = ((value & 0b0001_0000) != 0)
+ output[position + 4] = ((value & 0b0000_1000) != 0)
+ output[position + 5] = ((value & 0b0000_0100) != 0)
+ output[position + 6] = ((value & 0b0000_0010) != 0)
+ output[position + 7] = ((value & 0b0000_0001) != 0)
+
+ position += 8
+ current_byte += 1
+ # read last null bits
+ if (size & 0b111) > 0:
+ value = packed_boolean_array[-1]
+ mask = 0b1000_0000
+ position = size & ~0b111
+ while position < size:
+ output[position] = ((value & mask) != 0)
+ mask >>= 1
+ position += 1
+ return output, buffer
+
+
+# Serialized data layout:
+# +---------------+-----------------+-------------+
+# | may have null | null indicators | values |
+# +---------------+-----------------+-------------+
+# | byte | list[byte] | list[entry] |
+# +---------------+-----------------+-------------+
+#
+# Each entry is represented as:
+# +---------------+-------+
+# | value length | value |
+# +---------------+-------+
+# | int32 | bytes |
+# +---------------+-------+
+
+def read_binary_column(buffer, data_type, position_count):
+ if data_type != TSDataType.TEXT:
+ raise Exception("Invalid data type: " + data_type)
+ nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+
+ if nullIndicators is None:
+ size = position_count
+ else:
+ size = nullIndicators.count(False)
+ values = [None] * size
+ for i in range(size):
+ length, buffer = read_int_from_buffer(buffer)
+ res, buffer = read_from_buffer(buffer, length)
+ values[i] = res
+ return values, nullIndicators, buffer
+
+
+def read_column(encoding, buffer, data_type, position_count):
+ if encoding == b'\x00':
+ return read_byte_column(buffer, data_type, position_count)
+ elif encoding == b'\x01':
+ return read_Int32_column(buffer, data_type, position_count)
+ elif encoding == b'\x02':
+ return read_INT64_column(buffer, data_type, position_count)
+ elif encoding == b'\x03':
+ return read_binary_column(buffer, data_type, position_count)
+ elif encoding == b'\x04':
+ return read_runLength_column(buffer, data_type, position_count)
+ else:
+ raise Exception("Unsupported encoding: " + encoding)
+
+
+# Serialized data layout:
+# +-----------+-------------------------+
+# | encoding | serialized inner column |
+# +-----------+-------------------------+
+# | byte | list[byte] |
+# +-----------+-------------------------+
+
+def read_runLength_column(buffer, data_type, position_count):
+ encoding, buffer = read_byte_from_buffer(buffer)
+ column, nullIndicators, buffer = read_column(encoding, buffer, data_type, 1)
+
+ return repeat(column, data_type, position_count), nullIndicators * position_count, buffer
+
+
+def repeat(buffer, data_type, position_count):
+ if data_type == TSDataType.BOOLEAN or data_type == TSDataType.TEXT:
+ return buffer * position_count
+ else:
+ res = bytes(0)
+ for _ in range(position_count):
+ res.join(buffer)
+ return res
diff --git a/mlnode/requirements.txt b/mlnode/requirements.txt
new file mode 100644
index 0000000000..b5a8578e24
--- /dev/null
+++ b/mlnode/requirements.txt
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pandas>=1.3.5
+numpy>=1.21.4
+apache-iotdb
+poetry
diff --git a/mlnode/requirements_dev.txt b/mlnode/requirements_dev.txt
new file mode 100644
index 0000000000..e9a9f4bb38
--- /dev/null
+++ b/mlnode/requirements_dev.txt
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+-r requirements.txt
+# Pytest to run tests
+pytest==7.2.0
diff --git a/mlnode/test/test_serde.py b/mlnode/test/test_serde.py
new file mode 100644
index 0000000000..c7ff5c49b9
--- /dev/null
+++ b/mlnode/test/test_serde.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import random
+import time
+import numpy as np
+import pandas as pd
+from iotdb.mlnode.serde import convert_to_df
+from pandas.testing import assert_frame_equal
+
+device_id = "root.wt1"
+
+ts_path_lst = [
+ "root.wt1.temperature",
+ "root.wt1.windspeed",
+ "root.wt1.angle",
+ "root.wt1.altitude",
+ "root.wt1.status",
+ "root.wt1.hardware",
+]
+measurements = [
+ "temperature",
+ "windspeed",
+ "angle",
+ "altitude",
+ "status",
+ "hardware",
+]
+
+simple_binary = [
+ b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x14\x02\x02\x01\x03\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00\x00\x [...]
+binary_with_null = \
+ [
+ b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x13\x02\x02\x01\x03\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00\x [...]
+ b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x01\x02\x02\x01\x03\x04\x02\x04\x00\x00\x00\x00\x00\x00\x00\x00\x13\x00?\xd3\xc7\xae#\x86\xe1j\x00?\x10\xea!\x00\x00\x00\x00\x05text1\x00\x01\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00?\x01\x01\x80']
+column_names = ['root.wt1.altitude', 'root.wt1.temperature', 'root.wt1.angle', 'root.wt1.windspeed',
+ 'root.wt1.hardware', 'root.wt1.status']
+data_type_list = ['INT64', 'FLOAT', 'INT32', 'DOUBLE', 'TEXT', 'BOOLEAN']
+column_name_index = {'root.wt1.windspeed': 0, 'root.wt1.temperature': 1, 'root.wt1.hardware': 2, 'root.wt1.status': 3,
+ 'root.wt1.altitude': 4, 'root.wt1.angle': 5}
+
+
+def test_simple_query():
+ # test data
+ data_nums = 20
+ data = {}
+ timestamps = np.arange(data_nums, dtype="int64")
+ data[ts_path_lst[0]] = \
+ [0.38830405, 0.13355169, 0.09433287, 0.31279156, 0.8401496,
+ 0.28991386, 0.06868938, 0.46327657, 0.18984757, 0.93103373,
+ 0.6931865, 0.57700926, 0.7871424, 0.71476793, 0.8190981,
+ 0.13309939, 0.93298537, 0.71830374, 0.4302311, 0.8690279]
+ data[ts_path_lst[1]] = \
+ [0.14837307, 0.29024481, 0.62946148, 0.79063227, 0.08269055,
+ 0.93563022, 0.48093016, 0.44585047, 0.92432981, 0.52788153,
+ 0.47100021, 0.02110239, 0.37659929, 0.37622376, 0.31649863,
+ 0.86574633, 0.48679356, 0.21717449, 0.27767639, 0.83347897]
+ data[ts_path_lst[2]] = \
+ [93, 64, 47, 23, 80, 79, 15, 43, 79, 18, 17, 58, 76, 45, 27, 87, 91, 25, 72, 95]
+ data[ts_path_lst[3]] = \
+ [92, 87, 13, 83, 71, 64, 83, 62, 17, 80, 23, 28, 93, 30, 21, 86, 81,
+ 45, 62, 89]
+ data[ts_path_lst[4]] = [False, True, False, True, True, False, True, True, True,
+ True, False, True, False, False, False, False, True, True,
+ True, False]
+ data[ts_path_lst[5]] = ['text1', 'text2', 'text2', 'text1', 'text1', 'text1', 'text2',
+ 'text2', 'text2', 'text1', 'text1', 'text1', 'text2', 'text2',
+ 'text1', 'text2', 'text1', 'text1', 'text1', 'text1']
+
+ data[ts_path_lst[0]] = np.array(data[ts_path_lst[0]], dtype="float32")
+ data[ts_path_lst[2]] = np.array(data[ts_path_lst[2]], dtype="int32")
+
+ df_input = pd.DataFrame(data)
+ df_input.insert(0, "Time", timestamps)
+
+ df_output = convert_to_df(column_names, data_type_list, column_name_index, simple_binary)
+ df_output = df_output[df_input.columns.tolist()]
+
+ assert_frame_equal(df_input, df_output)
+
+
+def test_with_null_query():
+ # insert data
+
+ data_nums = 20
+ data = {}
+ timestamps = np.arange(data_nums, dtype="int64")
+ data[ts_path_lst[0]] = [0.049136366695165634, 0.4814307689666748, None, None, 0.3209269344806671,
+ 0.3143160343170166, 0.030140314251184464, None, 0.027547180652618408, 0.2688102126121521,
+ 0.9617357850074768, None, 0.30917245149612427, 0.4494488537311554, 0.41459938883781433,
+ None, 0.6761381030082703, 0.6640862822532654, 0.4531949460506439, 0.5660725235939026]
+ data[ts_path_lst[1]] = [0.5009131121558088, None, 0.26937357096243686, None, 0.3151998654674619, None, None, None,
+ 0.8431532356866694, 0.5903086429020434, None, None, 0.1315312090066042, None,
+ 0.7526326461335474, 0.9305597549133965, None, None, 0.8168103309315078, 0.30906251401349627]
+ data[ts_path_lst[2]] = [91, None, None, None, 11, 84, None, 80, 87, None, 17, None, None, 41, None, None, 60, 14,
+ 24, None]
+ data[ts_path_lst[3]] = [None, None, None, 66, None, None, None, 21, None, 14, None, None, 86, None, None, 62, None,
+ 35, None, 63]
+ data[ts_path_lst[4]] = [True, False, None, False, True, None, None, False, None, None, None, False, None, None,
+ True, False, False, None, False, None]
+ data[ts_path_lst[5]] = ['text1', None, 'text1', 'text2', 'text1', None, None, None, None, None, 'text1', None, None,
+ None, 'text2', 'text2', None, 'text2', None, 'text1']
+
+ data[ts_path_lst[0]] = pd.Series(data[ts_path_lst[0]]).astype("float32")
+ data[ts_path_lst[2]] = pd.Series(data[ts_path_lst[2]]).astype("Int32")
+ data[ts_path_lst[3]] = pd.Series(data[ts_path_lst[3]]).astype("Int64")
+ data[ts_path_lst[4]] = pd.Series(data[ts_path_lst[4]]).astype("boolean")
+
+ df_input = pd.DataFrame(data)
+ df_input.insert(0, "Time", timestamps)
+
+ df_output = convert_to_df(column_names, data_type_list, column_name_index, binary_with_null)
+ df_output = df_output[df_input.columns.tolist()]
+ assert_frame_equal(df_input, df_output)