You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/10 08:18:56 UTC

[iotdb] branch master updated: [IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 13c5ba0b73 [IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)
13c5ba0b73 is described below

commit 13c5ba0b7333c6fde9bf72e2f1fb63740cb41a41
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Mar 10 16:18:50 2023 +0800

    [IOTDB-5486] [IoTDB ML] The transformation between tsBlock(binary) and numpy.ndarray (#9096)
---
 .github/workflows/iotdb-ml.yml |  60 ++++++
 mlnode/iotdb/mlnode/serde.py   | 439 +++++++++++++++++++++++++++++++++++++++++
 mlnode/requirements.txt        |  22 +++
 mlnode/requirements_dev.txt    |  21 ++
 mlnode/test/test_serde.py      | 129 ++++++++++++
 5 files changed, 671 insertions(+)

diff --git a/.github/workflows/iotdb-ml.yml b/.github/workflows/iotdb-ml.yml
new file mode 100644
index 0000000000..e1fb4c0820
--- /dev/null
+++ b/.github/workflows/iotdb-ml.yml
@@ -0,0 +1,60 @@
+# This workflow is just for checking whether modifications works for the MLNode
+
+name: MLNode
+
+on:
+  push:
+    branches:
+      - master
+      - 'rel/*'
+      - "new_*"
+    paths-ignore:
+      - 'docs/**'
+      - 'site/**'
+  pull_request:
+    branches:
+      - master
+      - 'rel/*'
+      - "new_*"
+    paths-ignore:
+      - 'docs/**'
+      - 'site/**'
+  # allow manually run the action:
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+
+env:
+  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+  unix:
+    strategy:
+      fail-fast: false
+      max-parallel: 20
+    runs-on: ubuntu-latest
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Cache Maven packages
+        uses: actions/cache@v3
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2-
+      - name: Build IoTDB server distribution zip
+        run: mvn -B clean install -pl distribution -am -DskipTests
+      - name: Build IoTDB server docker image
+        run: |
+          docker build . -f docker/src/main/Dockerfile-1c1d -t "iotdb:dev"
+          docker images
+      - name: Install IoTDB mlnode requirements
+        run: pip3 install -r mlnode/requirements_dev.txt
+      - name: Build MLNode
+        run: mvn clean package -DskipUTs -pl mlnode -am && pip3 install poetry && cd mlnode && poetry build && pip install dist/apache_iotdb_mlnode-1.0.0-py3-none-any.whl --force-reinstall
+      - name: Integration test
+        shell: bash
+        run: |
+          pytest mlnode
diff --git a/mlnode/iotdb/mlnode/serde.py b/mlnode/iotdb/mlnode/serde.py
new file mode 100644
index 0000000000..4f491db606
--- /dev/null
+++ b/mlnode/iotdb/mlnode/serde.py
@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import numpy as np
+import pandas as pd
+from iotdb.utils.IoTDBConstants import TSDataType
+
+TIMESTAMP_STR = "Time"
+START_INDEX = 2
+
+
+def convert_to_df(name_list, type_list, name_index, binary_list):
+    column_name_list = [TIMESTAMP_STR]
+    column_type_list = [TSDataType.INT64]
+    column_ordinal_dict = {TIMESTAMP_STR: 1}
+
+    if name_index is not None:
+        column_type_deduplicated_list = [
+            None for _ in range(len(name_index))
+        ]
+        for i in range(len(name_list)):
+            name = name_list[i]
+            column_name_list.append(name)
+            column_type_list.append(TSDataType[type_list[i]])
+            if name not in column_ordinal_dict:
+                index = name_index[name]
+                column_ordinal_dict[name] = index + START_INDEX
+                column_type_deduplicated_list[index] = TSDataType[type_list[i]]
+    else:
+        index = START_INDEX
+        column_type_deduplicated_list = []
+        for i in range(len(name_list)):
+            name = name_list[i]
+            column_name_list.append(name)
+            column_type_list.append(TSDataType[type_list[i]])
+            if name not in column_ordinal_dict:
+                column_ordinal_dict[name] = index
+                index += 1
+                column_type_deduplicated_list.append(
+                    TSDataType[type_list[i]]
+                )
+
+    binary_size = len(binary_list)
+    binary_index = 0
+    result = {}
+    for column_name in column_name_list:
+        result[column_name] = None
+
+    while binary_index < binary_size:
+        buffer = binary_list[binary_index]
+        binary_index += 1
+        time_column_values, column_values, null_indicators, position_count = deserialize(buffer)
+        time_array = np.frombuffer(
+            time_column_values, np.dtype(np.longlong).newbyteorder(">")
+        )
+        if time_array.dtype.byteorder == ">":
+            time_array = time_array.byteswap().newbyteorder("<")
+
+        if result[TIMESTAMP_STR] is None:
+            result[TIMESTAMP_STR] = time_array
+        else:
+            result[TIMESTAMP_STR] = np.concatenate(
+                (result[TIMESTAMP_STR], time_array), axis=0
+            )
+        total_length = len(time_array)
+
+        for i in range(len(column_values)):
+            column_name = column_name_list[i + 1]
+
+            location = (
+                    column_ordinal_dict[column_name] - START_INDEX
+            )
+
+            if location < 0:
+                continue
+            data_type = column_type_deduplicated_list[location]
+            value_buffer = column_values[location]
+            value_buffer_len = len(value_buffer)
+
+            if data_type == TSDataType.DOUBLE:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.double).newbyteorder(">")
+                )
+            elif data_type == TSDataType.FLOAT:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.float32).newbyteorder(">")
+                )
+            elif data_type == TSDataType.BOOLEAN:
+                data_array = []
+                for index in range(len(value_buffer)):
+                    data_array.append(value_buffer[index])
+                data_array = np.array(data_array).astype("bool")
+            elif data_type == TSDataType.INT32:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.int32).newbyteorder(">")
+                )
+            elif data_type == TSDataType.INT64:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.int64).newbyteorder(">")
+                )
+            elif data_type == TSDataType.TEXT:
+                index = 0
+                data_array = []
+                while index < value_buffer_len:
+                    value_bytes = value_buffer[index]
+                    value = value_bytes.decode("utf-8")
+                    data_array.append(value)
+                    index += 1
+                data_array = np.array(data_array, dtype=object)
+            else:
+                raise RuntimeError("unsupported data type {}.".format(data_type))
+
+            if data_array.dtype.byteorder == ">":
+                data_array = data_array.byteswap().newbyteorder("<")
+
+            null_indicator = null_indicators[location]
+            if len(data_array) < total_length or (data_type == TSDataType.BOOLEAN and null_indicator is not None):
+                if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
+                    tmp_array = np.full(total_length, np.nan, np.float32)
+                elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE:
+                    tmp_array = np.full(total_length, np.nan, data_array.dtype)
+                elif data_type == TSDataType.BOOLEAN:
+                    tmp_array = np.full(total_length, np.nan, np.float32)
+                elif data_type == TSDataType.TEXT:
+                    tmp_array = np.full(total_length, None, dtype=data_array.dtype)
+                else:
+                    raise Exception("Unsupported dataType in deserialization")
+
+                if null_indicator is not None:
+                    indexes = [not v for v in null_indicator]
+                    if data_type == TSDataType.BOOLEAN:
+                        tmp_array[indexes] = data_array[indexes]
+                    else:
+                        tmp_array[indexes] = data_array
+
+                if data_type == TSDataType.INT32:
+                    tmp_array = pd.Series(tmp_array).astype("Int32")
+                elif data_type == TSDataType.INT64:
+                    tmp_array = pd.Series(tmp_array).astype("Int64")
+                elif data_type == TSDataType.BOOLEAN:
+                    tmp_array = pd.Series(tmp_array).astype("boolean")
+
+                data_array = tmp_array
+
+            if result[column_name] is None:
+                result[column_name] = data_array
+            else:
+                if isinstance(result[column_name], pd.Series):
+                    if not isinstance(data_array, pd.Series):
+                        if data_type == TSDataType.INT32:
+                            data_array = pd.Series(data_array).astype("Int32")
+                        elif data_type == TSDataType.INT64:
+                            data_array = pd.Series(data_array).astype("Int64")
+                        elif data_type == TSDataType.BOOLEAN:
+                            data_array = pd.Series(data_array).astype("boolean")
+                        else:
+                            raise RuntimeError("Series Error")
+                    result[column_name] = result[column_name].append(data_array)
+                else:
+                    result[column_name] = np.concatenate(
+                        (result[column_name], data_array), axis=0
+                    )
+    for k, v in result.items():
+        if v is None:
+            result[k] = []
+    df = pd.DataFrame(result)
+    df = df.reset_index(drop=True)
+    return df
+
+
+# Serialized tsblock:
+#    +-------------+---------------+---------+------------+-----------+----------+
+#    | val col cnt | val col types | pos cnt | encodings  | time col  | val col  |
+#    +-------------+---------------+---------+------------+-----------+----------+
+#    | int32       | list[byte]    | int32   | list[byte] |  bytes    | byte     |
+#    +-------------+---------------+---------+------------+-----------+----------+
+
+def deserialize(buffer):
+    value_column_count, buffer = read_int_from_buffer(buffer)
+    data_types, buffer = read_column_types(buffer, value_column_count)
+
+    position_count, buffer = read_int_from_buffer(buffer)
+    column_encodings, buffer = read_column_encoding(buffer, value_column_count + 1)
+
+    time_column_values, buffer = read_time_column(buffer, position_count)
+    column_values = [None] * value_column_count
+    null_indicators = [None] * value_column_count
+    for i in range(value_column_count):
+        column_value, nullIndicator, buffer = read_column(column_encodings[i + 1], buffer, data_types[i], position_count)
+        column_values[i] = column_value
+        null_indicators[i] = nullIndicator
+
+    return time_column_values, column_values, null_indicators, position_count
+
+
+# General Methods
+
+def read_int_from_buffer(buffer):
+    res, buffer = read_from_buffer(buffer, 4)
+    return int.from_bytes(res, "big"), buffer
+
+
+def read_byte_from_buffer(buffer):
+    return read_from_buffer(buffer, 1)
+
+
+def read_from_buffer(buffer, size):
+    res = buffer[:size]
+    buffer = buffer[size:]
+    return res, buffer
+
+
+# Read ColumnType
+
+def read_column_types(buffer, value_column_count):
+    data_types = []
+    for i in range(value_column_count):
+        res, buffer = read_byte_from_buffer(buffer)
+        data_types.append(get_dataType(res))
+    return data_types, buffer
+
+
+def get_dataType(value):
+    if value == b'\x00':
+        return TSDataType.BOOLEAN
+    elif value == b'\x01':
+        return TSDataType.INT32
+    elif value == b'\x02':
+        return TSDataType.INT64
+    elif value == b'\x03':
+        return TSDataType.FLOAT
+    elif value == b'\x04':
+        return TSDataType.DOUBLE
+    elif value == b'\x05':
+        return TSDataType.TEXT
+    elif value == b'\x06':
+        return TSDataType.VECTOR
+
+
+# Read ColumnEncodings
+
+def read_column_encoding(buffer, size):
+    encodings = []
+    for i in range(size):
+        res, buffer = read_byte_from_buffer(buffer)
+        encodings.append(res)
+    return encodings, buffer
+
+
+# Read Column
+
+def deserialize_null_indicators(buffer, size):
+    mayHaveNull, buffer = read_byte_from_buffer(buffer)
+    if mayHaveNull != b'\x00':
+        return deserialize_from_boolean_array(buffer, size)
+    return None, buffer
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[int64] |
+#    +---------------+-----------------+-------------+
+
+def read_time_column(buffer, size):
+    nullIndicators, buffer = deserialize_null_indicators(buffer, size)
+    if nullIndicators is None:
+        values, buffer = read_from_buffer(
+            buffer, size * 8
+        )
+    else:
+        raise Exception("TimeColumn should not contains null value")
+    return values, buffer
+
+
+def read_INT64_column(buffer, data_type, position_count):
+    nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+    if nullIndicators is None:
+        size = position_count
+    else:
+        size = nullIndicators.count(False)
+
+    if TSDataType.INT64 == data_type or TSDataType.DOUBLE == data_type:
+        values, buffer = read_from_buffer(buffer, size * 8)
+        return values, nullIndicators, buffer
+    else:
+        raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[int32] |
+#    +---------------+-----------------+-------------+
+
+def read_Int32_column(buffer, data_type, position_count):
+    nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+    if nullIndicators is None:
+        size = position_count
+    else:
+        size = nullIndicators.count(False)
+
+    if TSDataType.INT32 == data_type or TSDataType.FLOAT == data_type:
+        values, buffer = read_from_buffer(buffer, size * 4)
+        return values, nullIndicators, buffer
+    else:
+        raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[byte] |
+#    +---------------+-----------------+-------------+
+
+def read_byte_column(buffer, data_type, position_count):
+    if data_type != TSDataType.BOOLEAN:
+        raise Exception("Invalid data type: " + data_type)
+    nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+    res, buffer = deserialize_from_boolean_array(buffer, position_count)
+    return res, nullIndicators, buffer
+
+
+def deserialize_from_boolean_array(buffer, size):
+    packed_boolean_array, buffer = read_from_buffer(buffer, (size + 7) // 8)
+    current_byte = 0
+    output = [None] * size
+    position = 0
+    # read null bits 8 at a time
+    while position < (size & ~0b111):
+        value = packed_boolean_array[current_byte]
+        output[position] = ((value & 0b1000_0000) != 0)
+        output[position + 1] = ((value & 0b0100_0000) != 0)
+        output[position + 2] = ((value & 0b0010_0000) != 0)
+        output[position + 3] = ((value & 0b0001_0000) != 0)
+        output[position + 4] = ((value & 0b0000_1000) != 0)
+        output[position + 5] = ((value & 0b0000_0100) != 0)
+        output[position + 6] = ((value & 0b0000_0010) != 0)
+        output[position + 7] = ((value & 0b0000_0001) != 0)
+
+        position += 8
+        current_byte += 1
+    # read last null bits
+    if (size & 0b111) > 0:
+        value = packed_boolean_array[-1]
+        mask = 0b1000_0000
+        position = size & ~0b111
+        while position < size:
+            output[position] = ((value & mask) != 0)
+            mask >>= 1
+            position += 1
+    return output, buffer
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[entry] |
+#    +---------------+-----------------+-------------+
+#
+# Each entry is represented as:
+#    +---------------+-------+
+#    | value length  | value |
+#    +---------------+-------+
+#    | int32         | bytes |
+#    +---------------+-------+
+
+def read_binary_column(buffer, data_type, position_count):
+    if data_type != TSDataType.TEXT:
+        raise Exception("Invalid data type: " + data_type)
+    nullIndicators, buffer = deserialize_null_indicators(buffer, position_count)
+
+    if nullIndicators is None:
+        size = position_count
+    else:
+        size = nullIndicators.count(False)
+    values = [None] * size
+    for i in range(size):
+        length, buffer = read_int_from_buffer(buffer)
+        res, buffer = read_from_buffer(buffer, length)
+        values[i] = res
+    return values, nullIndicators, buffer
+
+
+def read_column(encoding, buffer, data_type, position_count):
+    if encoding == b'\x00':
+        return read_byte_column(buffer, data_type, position_count)
+    elif encoding == b'\x01':
+        return read_Int32_column(buffer, data_type, position_count)
+    elif encoding == b'\x02':
+        return read_INT64_column(buffer, data_type, position_count)
+    elif encoding == b'\x03':
+        return read_binary_column(buffer, data_type, position_count)
+    elif encoding == b'\x04':
+        return read_runLength_column(buffer, data_type, position_count)
+    else:
+        raise Exception("Unsupported encoding: " + encoding)
+
+
+# Serialized data layout:
+#    +-----------+-------------------------+
+#    | encoding  | serialized inner column |
+#    +-----------+-------------------------+
+#    | byte      | list[byte]              |
+#    +-----------+-------------------------+
+
+def read_runLength_column(buffer, data_type, position_count):
+    encoding, buffer = read_byte_from_buffer(buffer)
+    column, nullIndicators, buffer = read_column(encoding, buffer, data_type, 1)
+
+    return repeat(column, data_type, position_count), nullIndicators * position_count, buffer
+
+
+def repeat(buffer, data_type, position_count):
+    if data_type == TSDataType.BOOLEAN or data_type == TSDataType.TEXT:
+        return buffer * position_count
+    else:
+        res = bytes(0)
+        for _ in range(position_count):
+            res.join(buffer)
+        return res
diff --git a/mlnode/requirements.txt b/mlnode/requirements.txt
new file mode 100644
index 0000000000..b5a8578e24
--- /dev/null
+++ b/mlnode/requirements.txt
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pandas>=1.3.5
+numpy>=1.21.4
+apache-iotdb
+poetry
diff --git a/mlnode/requirements_dev.txt b/mlnode/requirements_dev.txt
new file mode 100644
index 0000000000..e9a9f4bb38
--- /dev/null
+++ b/mlnode/requirements_dev.txt
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+-r requirements.txt
+# Pytest to run tests
+pytest==7.2.0
diff --git a/mlnode/test/test_serde.py b/mlnode/test/test_serde.py
new file mode 100644
index 0000000000..c7ff5c49b9
--- /dev/null
+++ b/mlnode/test/test_serde.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import random
+import time
+import numpy as np
+import pandas as pd
+from iotdb.mlnode.serde import convert_to_df
+from pandas.testing import assert_frame_equal
+
+device_id = "root.wt1"
+
+ts_path_lst = [
+    "root.wt1.temperature",
+    "root.wt1.windspeed",
+    "root.wt1.angle",
+    "root.wt1.altitude",
+    "root.wt1.status",
+    "root.wt1.hardware",
+]
+measurements = [
+    "temperature",
+    "windspeed",
+    "angle",
+    "altitude",
+    "status",
+    "hardware",
+]
+
+simple_binary = [
+    b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x14\x02\x02\x01\x03\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00\x00\x [...]
+binary_with_null = \
+    [
+        b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x13\x02\x02\x01\x03\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00\x [...]
+        b'\x00\x00\x00\x06\x04\x03\x05\x00\x02\x01\x00\x00\x00\x01\x02\x02\x01\x03\x04\x02\x04\x00\x00\x00\x00\x00\x00\x00\x00\x13\x00?\xd3\xc7\xae#\x86\xe1j\x00?\x10\xea!\x00\x00\x00\x00\x05text1\x00\x01\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00?\x01\x01\x80']
+column_names = ['root.wt1.altitude', 'root.wt1.temperature', 'root.wt1.angle', 'root.wt1.windspeed',
+                'root.wt1.hardware', 'root.wt1.status']
+data_type_list = ['INT64', 'FLOAT', 'INT32', 'DOUBLE', 'TEXT', 'BOOLEAN']
+column_name_index = {'root.wt1.windspeed': 0, 'root.wt1.temperature': 1, 'root.wt1.hardware': 2, 'root.wt1.status': 3,
+                     'root.wt1.altitude': 4, 'root.wt1.angle': 5}
+
+
+def test_simple_query():
+    # test data
+    data_nums = 20
+    data = {}
+    timestamps = np.arange(data_nums, dtype="int64")
+    data[ts_path_lst[0]] = \
+        [0.38830405, 0.13355169, 0.09433287, 0.31279156, 0.8401496,
+         0.28991386, 0.06868938, 0.46327657, 0.18984757, 0.93103373,
+         0.6931865, 0.57700926, 0.7871424, 0.71476793, 0.8190981,
+         0.13309939, 0.93298537, 0.71830374, 0.4302311, 0.8690279]
+    data[ts_path_lst[1]] = \
+        [0.14837307, 0.29024481, 0.62946148, 0.79063227, 0.08269055,
+         0.93563022, 0.48093016, 0.44585047, 0.92432981, 0.52788153,
+         0.47100021, 0.02110239, 0.37659929, 0.37622376, 0.31649863,
+         0.86574633, 0.48679356, 0.21717449, 0.27767639, 0.83347897]
+    data[ts_path_lst[2]] = \
+        [93, 64, 47, 23, 80, 79, 15, 43, 79, 18, 17, 58, 76, 45, 27, 87, 91, 25, 72, 95]
+    data[ts_path_lst[3]] = \
+        [92, 87, 13, 83, 71, 64, 83, 62, 17, 80, 23, 28, 93, 30, 21, 86, 81,
+         45, 62, 89]
+    data[ts_path_lst[4]] = [False, True, False, True, True, False, True, True, True,
+                            True, False, True, False, False, False, False, True, True,
+                            True, False]
+    data[ts_path_lst[5]] = ['text1', 'text2', 'text2', 'text1', 'text1', 'text1', 'text2',
+                            'text2', 'text2', 'text1', 'text1', 'text1', 'text2', 'text2',
+                            'text1', 'text2', 'text1', 'text1', 'text1', 'text1']
+
+    data[ts_path_lst[0]] = np.array(data[ts_path_lst[0]], dtype="float32")
+    data[ts_path_lst[2]] = np.array(data[ts_path_lst[2]], dtype="int32")
+
+    df_input = pd.DataFrame(data)
+    df_input.insert(0, "Time", timestamps)
+
+    df_output = convert_to_df(column_names, data_type_list, column_name_index, simple_binary)
+    df_output = df_output[df_input.columns.tolist()]
+
+    assert_frame_equal(df_input, df_output)
+
+
+def test_with_null_query():
+    # insert data
+
+    data_nums = 20
+    data = {}
+    timestamps = np.arange(data_nums, dtype="int64")
+    data[ts_path_lst[0]] = [0.049136366695165634, 0.4814307689666748, None, None, 0.3209269344806671,
+                            0.3143160343170166, 0.030140314251184464, None, 0.027547180652618408, 0.2688102126121521,
+                            0.9617357850074768, None, 0.30917245149612427, 0.4494488537311554, 0.41459938883781433,
+                            None, 0.6761381030082703, 0.6640862822532654, 0.4531949460506439, 0.5660725235939026]
+    data[ts_path_lst[1]] = [0.5009131121558088, None, 0.26937357096243686, None, 0.3151998654674619, None, None, None,
+                            0.8431532356866694, 0.5903086429020434, None, None, 0.1315312090066042, None,
+                            0.7526326461335474, 0.9305597549133965, None, None, 0.8168103309315078, 0.30906251401349627]
+    data[ts_path_lst[2]] = [91, None, None, None, 11, 84, None, 80, 87, None, 17, None, None, 41, None, None, 60, 14,
+                            24, None]
+    data[ts_path_lst[3]] = [None, None, None, 66, None, None, None, 21, None, 14, None, None, 86, None, None, 62, None,
+                            35, None, 63]
+    data[ts_path_lst[4]] = [True, False, None, False, True, None, None, False, None, None, None, False, None, None,
+                            True, False, False, None, False, None]
+    data[ts_path_lst[5]] = ['text1', None, 'text1', 'text2', 'text1', None, None, None, None, None, 'text1', None, None,
+                            None, 'text2', 'text2', None, 'text2', None, 'text1']
+
+    data[ts_path_lst[0]] = pd.Series(data[ts_path_lst[0]]).astype("float32")
+    data[ts_path_lst[2]] = pd.Series(data[ts_path_lst[2]]).astype("Int32")
+    data[ts_path_lst[3]] = pd.Series(data[ts_path_lst[3]]).astype("Int64")
+    data[ts_path_lst[4]] = pd.Series(data[ts_path_lst[4]]).astype("boolean")
+
+    df_input = pd.DataFrame(data)
+    df_input.insert(0, "Time", timestamps)
+
+    df_output = convert_to_df(column_names, data_type_list, column_name_index, binary_with_null)
+    df_output = df_output[df_input.columns.tolist()]
+    assert_frame_equal(df_input, df_output)