You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/04/12 09:51:17 UTC

[ignite-python-thin-client] branch master updated: IGNITE-14511 Fix serialization of bytes, improve serialization-deserialization of collections. - Fixes #30.

This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 3586db7  IGNITE-14511 Fix serialization of bytes, improve serialization-deserialization of collections. - Fixes #30.
3586db7 is described below

commit 3586db7c30e22aff61d2251fec1ffaa355b4c599
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Mon Apr 12 12:50:49 2021 +0300

    IGNITE-14511 Fix serialization of bytes, improve serialization-deserialization of collections. - Fixes #30.
---
 pyignite/binary.py                      |   2 +-
 pyignite/datatypes/base.py              |   6 +-
 pyignite/datatypes/cache_properties.py  |  14 +-
 pyignite/datatypes/complex.py           | 595 +++++++++++++++-----------------
 pyignite/datatypes/internal.py          | 199 +++++------
 pyignite/datatypes/primitive.py         |  14 +-
 pyignite/datatypes/primitive_arrays.py  | 142 ++++----
 pyignite/datatypes/primitive_objects.py |  21 +-
 pyignite/datatypes/standard.py          | 280 +++++++--------
 pyignite/queries/response.py            |  64 ++--
 tests/common/test_datatypes.py          |  36 +-
 tests/common/test_key_value.py          |  11 +-
 tests/common/test_sql.py                | 128 +++++++
 13 files changed, 783 insertions(+), 729 deletions(-)

diff --git a/pyignite/binary.py b/pyignite/binary.py
index 5a5f895..551f1d0 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -151,7 +151,7 @@ class GenericObjectMeta(GenericObjectPropsMeta):
             write_footer(self, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf)
 
         def write_header(obj, stream):
-            header_class = BinaryObject.build_header()
+            header_class = BinaryObject.get_header_class()
             header = header_class()
             header.type_code = int.from_bytes(
                 BinaryObject.type_code,
diff --git a/pyignite/datatypes/base.py b/pyignite/datatypes/base.py
index fbd798b..87b251c 100644
--- a/pyignite/datatypes/base.py
+++ b/pyignite/datatypes/base.py
@@ -72,9 +72,9 @@ class IgniteDataType(metaclass=IgniteDataTypeMeta):
         cls.from_python(stream, value, **kwargs)
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
+    def to_python(cls, ctypes_object, *args, **kwargs):
         raise NotImplementedError
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        return cls.to_python(ctype_object, *args, **kwargs)
+    async def to_python_async(cls, ctypes_object, *args, **kwargs):
+        return cls.to_python(ctypes_object, *args, **kwargs)
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index d924507..9bf34de 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -115,12 +115,12 @@ class PropBase:
         return cls.parse(stream)
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return cls.prop_data_class.to_python(ctype_object.data, *args, **kwargs)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        return cls.to_python(ctype_object, *args, **kwargs)
+    async def to_python_async(cls, ctypes_object, *args, **kwargs):
+        return cls.to_python(ctypes_object, *args, **kwargs)
 
     @classmethod
     def from_python(cls, stream, value):
@@ -295,6 +295,6 @@ class AnyProperty(PropBase):
         )
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        prop_data_class = prop_map(ctype_object.prop_code)
-        return prop_data_class.to_python(ctype_object.data, *args, **kwargs)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        prop_data_class = prop_map(ctypes_object.prop_code)
+        return prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index 5cb6160..119c552 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -20,6 +20,7 @@ from typing import Optional
 
 from pyignite.constants import *
 from pyignite.exceptions import ParseError
+from .base import IgniteDataType
 from .internal import AnyDataObject, Struct, infer_from_python, infer_from_python_async
 from .type_codes import *
 from .type_ids import *
@@ -41,122 +42,100 @@ class ObjectArrayObject(Nullable):
 
     _type_name = NAME_OBJ_ARR
     _type_id = TYPE_OBJ_ARR
+    _fields = [
+        ('type_code', ctypes.c_byte),
+        ('type_id', ctypes.c_int),
+        ('length', ctypes.c_int)
+    ]
     type_code = TC_OBJECT_ARRAY
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('type_id', ctypes.c_int),
-                    ('length', ctypes.c_int),
-                ],
-            }
-        )
-
-    @classmethod
     def parse_not_null(cls, stream):
-        header, header_class = cls.__parse_header(stream)
+        length, fields = cls.__get_length(stream), []
 
-        fields = []
-        for i in range(header.length):
+        for i in range(length):
             c_type = AnyDataObject.parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return cls.__build_final_class(header_class, fields)
+        return cls.__build_final_class(fields)
 
     @classmethod
     async def parse_not_null_async(cls, stream):
-        header, header_class = cls.__parse_header(stream)
-
-        fields = []
-        for i in range(header.length):
+        length, fields = cls.__get_length(stream), []
+        for i in range(length):
             c_type = await AnyDataObject.parse_async(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return cls.__build_final_class(header_class, fields)
+        return cls.__build_final_class(fields)
 
     @classmethod
-    def __parse_header(cls, stream):
-        header_class = cls.build_header()
-        header = stream.read_ctype(header_class)
-        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
-        return header, header_class
+    def __get_length(cls, stream):
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz + int_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(2 * int_sz + b_sz, SEEK_CUR)
+        return length
 
     @classmethod
-    def __build_final_class(cls, header_class, fields):
+    def __build_final_class(cls, fields):
         return type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
-                '_fields_': fields,
+                '_fields_': cls._fields + fields,
             }
         )
 
     @classmethod
-    def to_python_not_null(cls, ctype_object, *args, **kwargs):
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         result = []
-        for i in range(ctype_object.length):
+        for i in range(ctypes_object.length):
             result.append(
                 AnyDataObject.to_python(
-                    getattr(ctype_object, 'element_{}'.format(i)),
+                    getattr(ctypes_object, f'element_{i}'),
                     *args, **kwargs
                 )
             )
-        return ctype_object.type_id, result
+        return ctypes_object.type_id, result
 
     @classmethod
-    async def to_python_not_null_async(cls, ctype_object, *args, **kwargs):
+    async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
         result = [
             await AnyDataObject.to_python_async(
-                getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs
+                getattr(ctypes_object, f'element_{i}'), *args, **kwargs
             )
-            for i in range(ctype_object.length)]
-        return ctype_object.type_id, result
+            for i in range(ctypes_object.length)]
+        return ctypes_object.type_id, result
 
     @classmethod
     def from_python_not_null(cls, stream, value, *args, **kwargs):
-        type_or_id, value = value
-        try:
-            length = len(value)
-        except TypeError:
-            value = [value]
-            length = 1
-
-        cls.__write_header(stream, type_or_id, length)
+        value = cls.__write_header(stream, value)
         for x in value:
             infer_from_python(stream, x)
 
     @classmethod
     async def from_python_not_null_async(cls, stream, value, *args, **kwargs):
-        type_or_id, value = value
+        value = cls.__write_header(stream, value)
+        for x in value:
+            await infer_from_python_async(stream, x)
+
+    @classmethod
+    def __write_header(cls, stream, value):
+        type_id, value = value
         try:
             length = len(value)
         except TypeError:
             value = [value]
             length = 1
 
-        cls.__write_header(stream, type_or_id, length)
-        for x in value:
-            await infer_from_python_async(stream, x)
+        stream.write(cls.type_code)
+        stream.write(type_id.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+        stream.write(length.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
 
-    @classmethod
-    def __write_header(cls, stream, type_or_id, length):
-        header_class = cls.build_header()
-        header = header_class()
-        header.type_code = int.from_bytes(
-            cls.type_code,
-            byteorder=PROTOCOL_BYTE_ORDER
-        )
-        header.length = length
-        header.type_id = type_or_id
-
-        stream.write(header)
+        return value
 
 
 class WrappedDataObject(Nullable):
@@ -171,31 +150,22 @@ class WrappedDataObject(Nullable):
     type_code = TC_ARRAY_WRAPPED_OBJECTS
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('length', ctypes.c_int),
-                ],
-            }
-        )
-
-    @classmethod
     def parse_not_null(cls, stream):
-        header_class = cls.build_header()
-        header = stream.read_ctype(header_class)
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
 
         final_class = type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': [
-                    ('payload', ctypes.c_byte * header.length),
+                    ('type_code', ctypes.c_byte),
+                    ('length', ctypes.c_int),
+                    ('payload', ctypes.c_byte * length),
                     ('offset', ctypes.c_int),
                 ],
             }
@@ -205,11 +175,11 @@ class WrappedDataObject(Nullable):
         return final_class
 
     @classmethod
-    def to_python_not_null(cls, ctype_object, *args, **kwargs):
-        return bytes(ctype_object.payload), ctype_object.offset
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return bytes(ctypes_object.payload), ctypes_object.offset
 
     @classmethod
-    def from_python(cls, stream, value, *args, **kwargs):
+    def from_python_not_null(cls, stream, value, *args, **kwargs):
         raise ParseError('Send unwrapped data.')
 
 
@@ -251,59 +221,47 @@ class CollectionObject(Nullable):
 
     _type_name = NAME_COL
     _type_id = TYPE_COL
+    _header_class = None
     type_code = TC_COLLECTION
     pythonic = list
     default = []
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('length', ctypes.c_int),
-                    ('type', ctypes.c_byte),
-                ],
-            }
-        )
-
-    @classmethod
     def parse_not_null(cls, stream):
-        header, header_class = cls.__parse_header(stream)
+        fields, length = cls.__parse_header(stream)
 
-        fields = []
-        for i in range(header.length):
+        for i in range(length):
             c_type = AnyDataObject.parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return cls.__build_final_class(header_class, fields)
+        return cls.__build_final_class(fields)
 
     @classmethod
     async def parse_not_null_async(cls, stream):
-        header, header_class = cls.__parse_header(stream)
+        fields, length = cls.__parse_header(stream)
 
-        fields = []
-        for i in range(header.length):
+        for i in range(length):
             c_type = await AnyDataObject.parse_async(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return cls.__build_final_class(header_class, fields)
+        return cls.__build_final_class(fields)
 
     @classmethod
     def __parse_header(cls, stream):
-        header_class = cls.build_header()
-        header = stream.read_ctype(header_class)
-        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
-        return header, header_class
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        header_fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)]
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(int_sz + 2 * b_sz, SEEK_CUR)
+        return header_fields, length
 
     @classmethod
-    def __build_final_class(cls, header_class, fields):
+    def __build_final_class(cls, fields):
         return type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': fields,
@@ -311,134 +269,91 @@ class CollectionObject(Nullable):
         )
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        length = cls.__get_length(ctype_object)
-        if length is None:
-            return None
-
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         result = [
-            AnyDataObject.to_python(getattr(ctype_object, f'element_{i}'), *args, **kwargs)
-            for i in range(length)
+            AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
+            for i in range(ctypes_object.length)
         ]
-        return ctype_object.type, result
+        return ctypes_object.type, result
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        length = cls.__get_length(ctype_object)
-        if length is None:
-            return None
-
+    async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
         result_coro = [
-            AnyDataObject.to_python_async(getattr(ctype_object, f'element_{i}'), *args, **kwargs)
-            for i in range(length)
+            AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
+            for i in range(ctypes_object.length)
         ]
 
-        return ctype_object.type, await asyncio.gather(*result_coro)
-
-    @classmethod
-    def __get_length(cls, ctype_object):
-        return getattr(ctype_object, "length", None)
+        return ctypes_object.type, await asyncio.gather(*result_coro)
 
     @classmethod
     def from_python_not_null(cls, stream, value, *args, **kwargs):
-        type_or_id, value = value
+        type_id, value = value
         try:
             length = len(value)
         except TypeError:
             value = [value]
             length = 1
 
-        cls.__write_header(stream, type_or_id, length)
+        cls.__write_header(stream, type_id, length)
         for x in value:
             infer_from_python(stream, x)
 
     @classmethod
     async def from_python_not_null_async(cls, stream, value, *args, **kwargs):
-        type_or_id, value = value
+        type_id, value = value
         try:
             length = len(value)
         except TypeError:
             value = [value]
             length = 1
 
-        cls.__write_header(stream, type_or_id, length)
+        cls.__write_header(stream, type_id, length)
         for x in value:
             await infer_from_python_async(stream, x)
 
     @classmethod
-    def __write_header(cls, stream, type_or_id, length):
-        header_class = cls.build_header()
-        header = header_class()
-        header.type_code = int.from_bytes(
-            cls.type_code,
-            byteorder=PROTOCOL_BYTE_ORDER
+    def __write_header(cls, stream, type_id, length):
+        stream.write(cls.type_code)
+        stream.write(length.to_bytes(
+            ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER
+        ))
+        stream.write(type_id.to_bytes(
+            length=ctypes.sizeof(ctypes.c_byte),
+            byteorder=PROTOCOL_BYTE_ORDER,
+            signed=True)
         )
 
-        header.length = length
-        header.type = type_or_id
-
-        stream.write(header)
-
 
-class Map(Nullable):
-    """
-    Dictionary type, payload-only.
-
-    Ignite does not track the order of key-value pairs in its caches, hence
-    the ordinary Python dict type, not the collections.OrderedDict.
-    """
-    _type_name = NAME_MAP
-    _type_id = TYPE_MAP
+class _MapBase:
     HASH_MAP = 1
     LINKED_HASH_MAP = 2
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('length', ctypes.c_int),
-                ],
-            }
-        )
+    def _parse_header(cls, stream):
+        raise NotImplementedError
 
     @classmethod
-    def parse_not_null(cls, stream):
-        header, header_class = cls.__parse_header(stream)
-
-        fields = []
-        for i in range(header.length << 1):
+    def _parse(cls, stream):
+        fields, length = cls._parse_header(stream)
+        for i in range(length << 1):
             c_type = AnyDataObject.parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
-
-        return cls.__build_final_class(header_class, fields)
+            fields.append((f'element_{i}', c_type))
+        return cls.__build_final_class(fields)
 
     @classmethod
-    async def parse_not_null_async(cls, stream):
-        header, header_class = cls.__parse_header(stream)
-
-        fields = []
-        for i in range(header.length << 1):
+    async def _parse_async(cls, stream):
+        fields, length = cls._parse_header(stream)
+        for i in range(length << 1):
             c_type = await AnyDataObject.parse_async(stream)
-            fields.append(('element_{}'.format(i), c_type))
-
-        return cls.__build_final_class(header_class, fields)
+            fields.append((f'element_{i}', c_type))
 
-    @classmethod
-    def __parse_header(cls, stream):
-        header_class = cls.build_header()
-        header = stream.read_ctype(header_class)
-        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
-        return header, header_class
+        return cls.__build_final_class(fields)
 
     @classmethod
-    def __build_final_class(cls, header_class, fields):
+    def __build_final_class(cls, fields):
         return type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': fields,
@@ -446,76 +361,118 @@ class Map(Nullable):
         )
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        map_cls = cls.__get_map_class(ctype_object)
+    def _to_python(cls, ctypes_object, *args, **kwargs):
+        map_cls = cls.__get_map_class(ctypes_object)
 
         result = map_cls()
-        for i in range(0, ctype_object.length << 1, 2):
+        for i in range(0, ctypes_object.length << 1, 2):
             k = AnyDataObject.to_python(
-                getattr(ctype_object, 'element_{}'.format(i)),
+                getattr(ctypes_object, f'element_{i}'),
                 *args, **kwargs
             )
             v = AnyDataObject.to_python(
-                getattr(ctype_object, 'element_{}'.format(i + 1)),
+                getattr(ctypes_object, f'element_{i + 1}'),
                 *args, **kwargs
             )
             result[k] = v
         return result
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        map_cls = cls.__get_map_class(ctype_object)
+    async def _to_python_async(cls, ctypes_object, *args, **kwargs):
+        map_cls = cls.__get_map_class(ctypes_object)
 
         kv_pairs_coro = [
             asyncio.gather(
                 AnyDataObject.to_python_async(
-                    getattr(ctype_object, 'element_{}'.format(i)),
+                    getattr(ctypes_object, f'element_{i}'),
                     *args, **kwargs
                 ),
                 AnyDataObject.to_python_async(
-                    getattr(ctype_object, 'element_{}'.format(i + 1)),
+                    getattr(ctypes_object, f'element_{i + 1}'),
                     *args, **kwargs
                 )
-            ) for i in range(0, ctype_object.length << 1, 2)
+            ) for i in range(0, ctypes_object.length << 1, 2)
         ]
 
         return map_cls(await asyncio.gather(*kv_pairs_coro))
 
     @classmethod
-    def __get_map_class(cls, ctype_object):
-        map_type = getattr(ctype_object, 'type', cls.HASH_MAP)
+    def __get_map_class(cls, ctypes_object):
+        map_type = getattr(ctypes_object, 'type', cls.HASH_MAP)
         return OrderedDict if map_type == cls.LINKED_HASH_MAP else dict
 
     @classmethod
-    def from_python(cls, stream, value, type_id=None):
-        cls.__write_header(stream, type_id, len(value))
+    def _from_python(cls, stream, value, type_id=None):
+        cls._write_header(stream, type_id, len(value))
         for k, v in value.items():
             infer_from_python(stream, k)
             infer_from_python(stream, v)
 
     @classmethod
-    async def from_python_async(cls, stream, value, type_id=None):
-        cls.__write_header(stream, type_id, len(value))
+    async def _from_python_async(cls, stream, value, type_id):
+        cls._write_header(stream, type_id, len(value))
         for k, v in value.items():
             await infer_from_python_async(stream, k)
             await infer_from_python_async(stream, v)
 
     @classmethod
-    def __write_header(cls, stream, type_id, length):
-        header_class = cls.build_header()
-        header = header_class()
-        header.length = length
+    def _write_header(cls, stream, type_id, length):
+        raise NotImplementedError
 
-        if hasattr(header, 'type_code'):
-            header.type_code = int.from_bytes(cls.type_code, byteorder=PROTOCOL_BYTE_ORDER)
 
-        if hasattr(header, 'type'):
-            header.type = type_id
+class Map(IgniteDataType, _MapBase):
+    """
+    Dictionary type, payload-only.
 
-        stream.write(header)
+    Ignite does not track the order of key-value pairs in its caches, hence
+    the ordinary Python dict type, not the collections.OrderedDict.
+    """
+    _type_name = NAME_MAP
+    _type_id = TYPE_MAP
+
+    @classmethod
+    def parse(cls, stream):
+        return cls._parse(stream)
+
+    @classmethod
+    async def parse_async(cls, stream):
+        return await cls._parse_async(stream)
+
+    @classmethod
+    def _parse_header(cls, stream):
+        int_sz = ctypes.sizeof(ctypes.c_int)
+        length = int.from_bytes(
+            stream.slice(stream.tell(), int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(int_sz, SEEK_CUR)
+        return [('length', ctypes.c_int)], length
+
+    @classmethod
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return cls._to_python(ctypes_object, *args, **kwargs)
+
+    @classmethod
+    async def to_python_async(cls, ctypes_object, *args, **kwargs):
+        return await cls._to_python_async(ctypes_object, *args, **kwargs)
+
+    @classmethod
+    def from_python(cls, stream, value, type_id=None):
+        return cls._from_python(stream, value, type_id)
+
+    @classmethod
+    async def from_python_async(cls, stream, value, type_id=None):
+        return await cls._from_python_async(stream, value, type_id)
+
+    @classmethod
+    def _write_header(cls, stream, type_id, length):
+        stream.write(length.to_bytes(
+            length=ctypes.sizeof(ctypes.c_int),
+            byteorder=PROTOCOL_BYTE_ORDER
+        ))
 
 
-class MapObject(Map):
+class MapObject(Nullable, _MapBase):
     """
     This is a dictionary type.
 
@@ -531,61 +488,65 @@ class MapObject(Map):
     default = {}
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('length', ctypes.c_int),
-                    ('type', ctypes.c_byte),
-                ],
-            }
-        )
+    def parse_not_null(cls, stream):
+        return cls._parse(stream)
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        obj_type = getattr(ctype_object, "type", None)
-        if obj_type:
-            return obj_type, super().to_python(ctype_object, *args, **kwargs)
-        return None
+    async def parse_not_null_async(cls, stream):
+        return await cls._parse_async(stream)
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        obj_type = getattr(ctype_object, "type", None)
-        if obj_type:
-            return obj_type, await super().to_python_async(ctype_object, *args, **kwargs)
-        return None
+    def _parse_header(cls, stream):
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(int_sz + 2 * b_sz, SEEK_CUR)
+        fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)]
+        return fields, length
 
     @classmethod
-    def __get_obj_type(cls, ctype_object):
-        return getattr(ctype_object, "type", None)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs)
 
     @classmethod
-    def from_python(cls, stream, value, **kwargs):
-        type_id, value = cls.__unpack_value(stream, value)
-        if value:
-            super().from_python(stream, value, type_id)
+    async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs)
 
     @classmethod
-    async def from_python_async(cls, stream, value, **kwargs):
-        type_id, value = cls.__unpack_value(stream, value)
-        if value:
-            await super().from_python_async(stream, value, type_id)
+    def from_python_not_null(cls, stream, value, **kwargs):
+        type_id, value = value
+        if value is None:
+            Null.from_python(stream)
+        else:
+            cls._from_python(stream, value, type_id)
 
     @classmethod
-    def __unpack_value(cls, stream, value):
+    async def from_python_not_null_async(cls, stream, value, **kwargs):
+        type_id, value = value
         if value is None:
             Null.from_python(stream)
-            return None, None
+        else:
+            await cls._from_python_async(stream, value, type_id)
 
-        return value
+    @classmethod
+    def _write_header(cls, stream, type_id, length):
+        stream.write(cls.type_code)
+        stream.write(length.to_bytes(
+            length=ctypes.sizeof(ctypes.c_int),
+            byteorder=PROTOCOL_BYTE_ORDER)
+        )
+        stream.write(type_id.to_bytes(
+            length=ctypes.sizeof(ctypes.c_byte),
+            byteorder=PROTOCOL_BYTE_ORDER,
+            signed=True)
+        )
 
 
 class BinaryObject(Nullable):
     _type_id = TYPE_BINARY_OBJ
+    _header_class = None
     type_code = TC_COMPLEX_OBJECT
 
     USER_TYPE = 0x0001
@@ -615,24 +576,26 @@ class BinaryObject(Nullable):
         return value._hashcode
 
     @classmethod
-    def build_header(cls):
-        return type(
-            cls.__name__,
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('version', ctypes.c_byte),
-                    ('flags', ctypes.c_short),
-                    ('type_id', ctypes.c_int),
-                    ('hash_code', ctypes.c_int),
-                    ('length', ctypes.c_int),
-                    ('schema_id', ctypes.c_int),
-                    ('schema_offset', ctypes.c_int),
-                ],
-            }
-        )
+    def get_header_class(cls):
+        if not cls._header_class:
+            cls._header_class = type(
+                cls.__name__,
+                (ctypes.LittleEndianStructure,),
+                {
+                    '_pack_': 1,
+                    '_fields_': [
+                        ('type_code', ctypes.c_byte),
+                        ('version', ctypes.c_byte),
+                        ('flags', ctypes.c_short),
+                        ('type_id', ctypes.c_int),
+                        ('hash_code', ctypes.c_int),
+                        ('length', ctypes.c_int),
+                        ('schema_id', ctypes.c_int),
+                        ('schema_offset', ctypes.c_int),
+                    ],
+                }
+            )
+        return cls._header_class
 
     @classmethod
     def offset_c_type(cls, flags: int):
@@ -686,7 +649,7 @@ class BinaryObject(Nullable):
 
     @classmethod
     def __parse_header(cls, stream):
-        header_class = cls.build_header()
+        header_class = cls.get_header_class()
         header = stream.read_ctype(header_class)
         stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
         return header, header_class
@@ -717,51 +680,51 @@ class BinaryObject(Nullable):
         return final_class
 
     @classmethod
-    def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
-        type_id = cls.__get_type_id(ctype_object, client)
-        if type_id:
-            data_class = client.query_binary_type(type_id, ctype_object.schema_id)
-
-            result = data_class()
-            result.version = ctype_object.version
-            for field_name, field_type in data_class.schema.items():
-                setattr(
-                    result, field_name, field_type.to_python(
-                        getattr(ctype_object.object_fields, field_name),
-                        client, *args, **kwargs
-                    )
-                )
-            return result
+    def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs):
+        type_id = ctypes_object.type_id
+        if not client:
+            raise ParseError(f'Can not query binary type {type_id}')
 
-        return None
+        data_class = client.query_binary_type(type_id, ctypes_object.schema_id)
+        result = data_class()
+        result.version = ctypes_object.version
 
-    @classmethod
-    async def to_python_async(cls, ctype_object, client: 'AioClient' = None, *args, **kwargs):
-        type_id = cls.__get_type_id(ctype_object, client)
-        if type_id:
-            data_class = await client.query_binary_type(type_id, ctype_object.schema_id)
-
-            result = data_class()
-            result.version = ctype_object.version
-
-            field_values = await asyncio.gather(
-                *[
-                    field_type.to_python_async(
-                        getattr(ctype_object.object_fields, field_name), client, *args, **kwargs
-                    )
-                    for field_name, field_type in data_class.schema.items()
-                ]
+        for field_name, field_type in data_class.schema.items():
+            setattr(
+                result, field_name, field_type.to_python(
+                    getattr(ctypes_object.object_fields, field_name),
+                    client, *args, **kwargs
+                )
             )
+        return result
 
-            for i, field_name in enumerate(data_class.schema.keys()):
-                setattr(result, field_name, field_values[i])
+    @classmethod
+    async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs):
+        type_id = ctypes_object.type_id
+        if not client:
+            raise ParseError(f'Can not query binary type {type_id}')
 
-            return result
-        return None
+        data_class = await client.query_binary_type(type_id, ctypes_object.schema_id)
+        result = data_class()
+        result.version = ctypes_object.version
+
+        field_values = await asyncio.gather(
+            *[
+                field_type.to_python_async(
+                    getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs
+                )
+                for field_name, field_type in data_class.schema.items()
+            ]
+        )
+
+        for i, field_name in enumerate(data_class.schema.keys()):
+            setattr(result, field_name, field_values[i])
+
+        return result
 
     @classmethod
-    def __get_type_id(cls, ctype_object, client):
-        type_id = getattr(ctype_object, "type_id", None)
+    def __get_type_id(cls, ctypes_object, client):
+        type_id = getattr(ctypes_object, "type_id", None)
         if type_id:
             if not client:
                 raise ParseError(f'Can not query binary type {type_id}')
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 55ed844..9bd1b76 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -136,15 +136,15 @@ class Conditional:
             return await self.var1.parse_async(stream)
         return await self.var2.parse_async(stream)
 
-    def to_python(self, ctype_object, context, *args, **kwargs):
+    def to_python(self, ctypes_object, context, *args, **kwargs):
         if self.predicate2(context):
-            return self.var1.to_python(ctype_object, *args, **kwargs)
-        return self.var2.to_python(ctype_object, *args, **kwargs)
+            return self.var1.to_python(ctypes_object, *args, **kwargs)
+        return self.var2.to_python(ctypes_object, *args, **kwargs)
 
-    async def to_python_async(self, ctype_object, context, *args, **kwargs):
+    async def to_python_async(self, ctypes_object, context, *args, **kwargs):
         if self.predicate2(context):
-            return await self.var1.to_python_async(ctype_object, *args, **kwargs)
-        return await self.var2.to_python_async(ctype_object, *args, **kwargs)
+            return await self.var1.to_python_async(ctypes_object, *args, **kwargs)
+        return await self.var2.to_python_async(ctypes_object, *args, **kwargs)
 
 
 @attr.s
@@ -154,67 +154,56 @@ class StructArray:
     counter_type = attr.ib(default=ctypes.c_int)
     defaults = attr.ib(type=dict, default={})
 
-    def build_header_class(self):
-        return type(
-            self.__class__.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('length', self.counter_type),
-                ],
-            },
-        )
-
     def parse(self, stream):
-        fields, length = [], self.__parse_length(stream)
+        fields, length = self.__parse_header(stream)
 
         for i in range(length):
             c_type = Struct(self.following).parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return self.__build_final_class(fields)
+        return self.build_c_type(fields)
 
     async def parse_async(self, stream):
-        fields, length = [], self.__parse_length(stream)
+        fields, length = self.__parse_header(stream)
 
         for i in range(length):
             c_type = await Struct(self.following).parse_async(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        return self.__build_final_class(fields)
+        return self.build_c_type(fields)
 
-    def __parse_length(self, stream):
-        counter_type_len = ctypes.sizeof(self.counter_type)
+    def __parse_header(self, stream):
+        counter_sz = ctypes.sizeof(self.counter_type)
         length = int.from_bytes(
-            stream.slice(offset=counter_type_len),
+            stream.slice(offset=counter_sz),
             byteorder=PROTOCOL_BYTE_ORDER
         )
-        stream.seek(counter_type_len, SEEK_CUR)
-        return length
+        stream.seek(counter_sz, SEEK_CUR)
+        return [('length', self.counter_type)], length
 
-    def __build_final_class(self, fields):
+    @staticmethod
+    def build_c_type(fields):
         return type(
             'StructArray',
-            (self.build_header_class(),),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': fields,
             },
         )
 
-    def to_python(self, ctype_object, *args, **kwargs):
-        length = getattr(ctype_object, 'length', 0)
+    def to_python(self, ctypes_object, *args, **kwargs):
+        length = getattr(ctypes_object, 'length', 0)
         return [
-            Struct(self.following, dict_type=dict).to_python(getattr(ctype_object, 'element_{}'.format(i)),
+            Struct(self.following, dict_type=dict).to_python(getattr(ctypes_object, f'element_{i}'),
                                                              *args, **kwargs)
             for i in range(length)
         ]
 
-    async def to_python_async(self, ctype_object, *args, **kwargs):
-        length = getattr(ctype_object, 'length', 0)
+    async def to_python_async(self, ctypes_object, *args, **kwargs):
+        length = getattr(ctypes_object, 'length', 0)
         result_coro = [
-            Struct(self.following, dict_type=dict).to_python_async(getattr(ctype_object, 'element_{}'.format(i)),
+            Struct(self.following, dict_type=dict).to_python_async(getattr(ctypes_object, f'element_{i}'),
                                                                    *args, **kwargs)
             for i in range(length)
         ]
@@ -239,10 +228,10 @@ class StructArray:
                 await el_class.from_python_async(stream, v[name])
 
     def __write_header(self, stream, length):
-        header_class = self.build_header_class()
-        header = header_class()
-        header.length = length
-        stream.write(header)
+        stream.write(
+            length.to_bytes(ctypes.sizeof(self.counter_type),
+                            byteorder=PROTOCOL_BYTE_ORDER)
+        )
 
 
 @attr.s
@@ -262,7 +251,7 @@ class Struct:
             if name in ctx:
                 ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
 
-        return self.__build_final_class(fields)
+        return self.build_c_type(fields)
 
     async def parse_async(self, stream):
         fields, ctx = [], self.__prepare_conditional_ctx()
@@ -274,7 +263,7 @@ class Struct:
             if name in ctx:
                 ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
 
-        return self.__build_final_class(fields)
+        return self.build_c_type(fields)
 
     def __prepare_conditional_ctx(self):
         ctx = {}
@@ -285,7 +274,7 @@ class Struct:
         return ctx
 
     @staticmethod
-    def __build_final_class(fields):
+    def build_c_type(fields):
         return type(
             'Struct',
             (ctypes.LittleEndianStructure,),
@@ -295,34 +284,34 @@ class Struct:
             },
         )
 
-    def to_python(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]:
+    def to_python(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]:
         result = self.dict_type()
         for name, c_type in self.fields:
             is_cond = isinstance(c_type, Conditional)
             result[name] = c_type.to_python(
-                getattr(ctype_object, name),
+                getattr(ctypes_object, name),
                 result,
                 *args, **kwargs
             ) if is_cond else c_type.to_python(
-                getattr(ctype_object, name),
+                getattr(ctypes_object, name),
                 *args, **kwargs
             )
         return result
 
-    async def to_python_async(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]:
+    async def to_python_async(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]:
         result = self.dict_type()
         for name, c_type in self.fields:
             is_cond = isinstance(c_type, Conditional)
 
             if is_cond:
                 value = await c_type.to_python_async(
-                    getattr(ctype_object, name),
+                    getattr(ctypes_object, name),
                     result,
                     *args, **kwargs
                 )
             else:
                 value = await c_type.to_python_async(
-                    getattr(ctype_object, name),
+                    getattr(ctypes_object, name),
                     *args, **kwargs
                 )
             result[name] = value
@@ -405,18 +394,18 @@ class AnyDataObject:
             raise ParseError('Unknown type code: `{}`'.format(type_code))
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        data_class = cls.__data_class_from_ctype(ctype_object)
-        return data_class.to_python(ctype_object)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        data_class = cls.__data_class_from_ctype(ctypes_object)
+        return data_class.to_python(ctypes_object)
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        data_class = cls.__data_class_from_ctype(ctype_object)
-        return await data_class.to_python_async(ctype_object)
+    async def to_python_async(cls, ctypes_object, *args, **kwargs):
+        data_class = cls.__data_class_from_ctype(ctypes_object)
+        return await data_class.to_python_async(ctypes_object)
 
     @classmethod
-    def __data_class_from_ctype(cls, ctype_object):
-        type_code = ctype_object.type_code.to_bytes(
+    def __data_class_from_ctype(cls, ctypes_object):
+        type_code = ctypes_object.type_code.to_bytes(
             ctypes.sizeof(ctypes.c_byte),
             byteorder=PROTOCOL_BYTE_ORDER
         )
@@ -440,7 +429,7 @@ class AnyDataObject:
             int: LongObject,
             float: DoubleObject,
             str: String,
-            bytes: String,
+            bytes: ByteArrayObject,
             bytearray: ByteArrayObject,
             bool: BoolObject,
             type(None): Null,
@@ -455,7 +444,6 @@ class AnyDataObject:
             int: LongArrayObject,
             float: DoubleArrayObject,
             str: StringArrayObject,
-            bytes: StringArrayObject,
             bool: BoolArrayObject,
             uuid.UUID: UUIDArrayObject,
             datetime: DateArrayObject,
@@ -558,48 +546,33 @@ class AnyDataArray(AnyDataObject):
     """
     counter_type = attr.ib(default=ctypes.c_int)
 
-    def build_header(self):
-        return type(
-            self.__class__.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('length', self.counter_type),
-                ],
-            }
-        )
-
     def parse(self, stream):
-        header, header_class = self.__parse_header(stream)
-
-        fields = []
-        for i in range(header.length):
+        fields, length = self.__parse_header(stream)
+        for i in range(length):
             c_type = super().parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
-
-        return self.__build_final_class(header_class, fields)
+            fields.append((f'element_{i}', c_type))
+        return self.build_c_type(fields)
 
     async def parse_async(self, stream):
-        header, header_class = self.__parse_header(stream)
-
-        fields = []
-        for i in range(header.length):
+        fields, length = self.__parse_header(stream)
+        for i in range(length):
             c_type = await super().parse_async(stream)
-            fields.append(('element_{}'.format(i), c_type))
-
-        return self.__build_final_class(header_class, fields)
+            fields.append((f'element_{i}', c_type))
+        return self.build_c_type(fields)
 
     def __parse_header(self, stream):
-        header_class = self.build_header()
-        header = stream.read_ctype(header_class)
-        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
-        return header, header_class
+        cnt_sz = ctypes.sizeof(self.counter_type)
+        length = int.from_bytes(
+            stream.slice(stream.tell(), cnt_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(cnt_sz, SEEK_CUR)
+        return [('length', self.counter_type)], length
 
-    def __build_final_class(self, header_class, fields):
+    def build_c_type(self, fields):
         return type(
             self.__class__.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': fields,
@@ -607,56 +580,50 @@ class AnyDataArray(AnyDataObject):
         )
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        length = cls.__get_length(ctype_object)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        length = getattr(ctypes_object, "length", 0)
 
         return [
-            super().to_python(getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs)
+            super().to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
             for i in range(length)
         ]
 
     @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        length = cls.__get_length(ctype_object)
+    async def to_python_async(cls, ctypes_object, *args, **kwargs):
+        length = getattr(ctypes_object, "length", 0)
 
         values = asyncio.gather(
             *[
                 super().to_python(
-                    getattr(ctype_object, 'element_{}'.format(i)),
+                    getattr(ctypes_object, f'element_{i}'),
                     *args, **kwargs
                 ) for i in range(length)
             ]
         )
         return await values
 
-    @staticmethod
-    def __get_length(ctype_object):
-        return getattr(ctype_object, "length", None)
-
     def from_python(self, stream, value):
-        try:
-            length = len(value)
-        except TypeError:
-            value = [value]
-            length = 1
-        self.__write_header(stream, length)
+        value = self.__write_header_and_process_value(stream, value)
 
         for x in value:
             infer_from_python(stream, x)
 
     async def from_python_async(self, stream, value):
+        value = self.__write_header_and_process_value(stream, value)
+
+        for x in value:
+            await infer_from_python_async(stream, x)
+
+    def __write_header_and_process_value(self, stream, value):
         try:
             length = len(value)
         except TypeError:
             value = [value]
             length = 1
-        self.__write_header(stream, length)
 
-        for x in value:
-            await infer_from_python_async(stream, x)
+        stream.write(length.to_bytes(
+            ctypes.sizeof(self.counter_type),
+            byteorder=PROTOCOL_BYTE_ORDER
+        ))
 
-    def __write_header(self, stream, length):
-        header_class = self.build_header()
-        header = header_class()
-        header.length = length
-        stream.write(header)
+        return value
diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py
index 3bbb196..037f680 100644
--- a/pyignite/datatypes/primitive.py
+++ b/pyignite/datatypes/primitive.py
@@ -52,8 +52,8 @@ class Primitive(IgniteDataType):
         return cls.c_type
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return ctype_object
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object
 
 
 class Byte(Primitive):
@@ -122,8 +122,8 @@ class Char(Primitive):
     c_type = ctypes.c_short
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return ctype_object.value.to_bytes(
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.value.to_bytes(
             ctypes.sizeof(cls.c_type),
             byteorder=PROTOCOL_BYTE_ORDER
         ).decode(PROTOCOL_CHAR_ENCODING)
@@ -147,9 +147,9 @@ class Bool(Primitive):
     c_type = ctypes.c_byte  # Use c_byte because c_bool throws endianness conversion error on BE systems.
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return ctype_object != 0
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object != 0
 
     @classmethod
-    def from_python(cls, stream, value):
+    def from_python(cls, stream, value, **kwargs):
         stream.write(struct.pack("<b", 1 if value else 0))
diff --git a/pyignite/datatypes/primitive_arrays.py b/pyignite/datatypes/primitive_arrays.py
index a21de77..e1d4289 100644
--- a/pyignite/datatypes/primitive_arrays.py
+++ b/pyignite/datatypes/primitive_arrays.py
@@ -17,6 +17,7 @@ import ctypes
 from io import SEEK_CUR
 
 from pyignite.constants import *
+from .base import IgniteDataType
 from .null_object import Nullable
 from .primitive import *
 from .type_codes import *
@@ -32,70 +33,50 @@ __all__ = [
 ]
 
 
-class PrimitiveArray(Nullable):
+class PrimitiveArray(IgniteDataType):
     """
     Base class for array of primitives. Payload-only.
     """
     _type_name = None
     _type_id = None
     primitive_type = None
-    type_code = None
 
     @classmethod
-    def build_header_class(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('length', ctypes.c_int),
-                ],
-            }
+    def build_c_type(cls, stream):
+        length = int.from_bytes(
+            stream.slice(stream.tell(), ctypes.sizeof(ctypes.c_int)),
+            byteorder=PROTOCOL_BYTE_ORDER
         )
 
-    @classmethod
-    def parse_not_null(cls, stream):
-        header_class = cls.build_header_class()
-        header = stream.read_ctype(header_class)
-
-        final_class = type(
+        return type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure, ),
             {
                 '_pack_': 1,
                 '_fields_': [
-                    ('data', cls.primitive_type.c_type * header.length),
+                    ('length', ctypes.c_int),
+                    ('data', cls.primitive_type.c_type * length),
                 ],
             }
         )
-        stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
-        return final_class
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        length = getattr(ctype_object, "length", None)
-        if length is None:
-            return None
-        return [ctype_object.data[i] for i in range(ctype_object.length)]
+    def parse(cls, stream):
+        c_type = cls.build_c_type(stream)
+        stream.seek(ctypes.sizeof(c_type), SEEK_CUR)
+        return c_type
 
     @classmethod
-    async def to_python_async(cls, ctypes_object, *args, **kwargs):
-        return cls.to_python(ctypes_object, *args, **kwargs)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return [ctypes_object.data[i] for i in range(ctypes_object.length)]
 
     @classmethod
-    def from_python_not_null(cls, stream, value, **kwargs):
-        header_class = cls.build_header_class()
-        header = header_class()
-        if hasattr(header, 'type_code'):
-            header.type_code = int.from_bytes(
-                cls.type_code,
-                byteorder=PROTOCOL_BYTE_ORDER
-            )
-        length = len(value)
-        header.length = length
+    def _write_header(cls, stream, value):
+        stream.write(len(value).to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
 
-        stream.write(header)
+    @classmethod
+    def from_python(cls, stream, value, **kwargs):
+        cls._write_header(stream, value)
         for x in value:
             cls.primitive_type.from_python(stream, x)
 
@@ -107,19 +88,12 @@ class ByteArray(PrimitiveArray):
     type_code = TC_BYTE_ARRAY
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        data = getattr(ctype_object, "data", None)
-        if data is None:
-            return None
-        return bytearray(data)
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return bytes(ctypes_object.data)
 
     @classmethod
-    def from_python(cls, stream, value):
-        header_class = cls.build_header_class()
-        header = header_class()
-        header.length = len(value)
-
-        stream.write(header)
+    def from_python(cls, stream, value, **kwargs):
+        cls._write_header(stream, value)
         stream.write(bytearray(value))
 
 
@@ -172,29 +146,58 @@ class BoolArray(PrimitiveArray):
     type_code = TC_BOOL_ARRAY
 
 
-class PrimitiveArrayObject(PrimitiveArray):
+class PrimitiveArrayObject(Nullable):
     """
     Base class for primitive array object. Type code plus payload.
     """
     _type_name = None
     _type_id = None
+    primitive_type = None
+    type_code = None
     pythonic = list
     default = []
 
     @classmethod
-    def build_header_class(cls):
+    def build_c_type(cls, stream):
+        length = int.from_bytes(
+            stream.slice(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+
         return type(
-            cls.__name__ + 'Header',
+            cls.__name__,
             (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': [
                     ('type_code', ctypes.c_byte),
                     ('length', ctypes.c_int),
+                    ('data', cls.primitive_type.c_type * length),
                 ],
             }
         )
 
+    @classmethod
+    def parse_not_null(cls, stream):
+        c_type = cls.build_c_type(stream)
+        stream.seek(ctypes.sizeof(c_type), SEEK_CUR)
+        return c_type
+
+    @classmethod
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return [ctypes_object.data[i] for i in range(ctypes_object.length)]
+
+    @classmethod
+    def from_python_not_null(cls, stream, value, **kwargs):
+        cls._write_header(stream, value)
+        for x in value:
+            cls.primitive_type.from_python(stream, x)
+
+    @classmethod
+    def _write_header(cls, stream, value):
+        stream.write(cls.type_code)
+        stream.write(len(value).to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
+
 
 class ByteArrayObject(PrimitiveArrayObject):
     _type_name = NAME_BYTE_ARR
@@ -203,19 +206,12 @@ class ByteArrayObject(PrimitiveArrayObject):
     type_code = TC_BYTE_ARRAY
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return ByteArray.to_python(ctype_object, *args, **kwargs)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return bytes(ctypes_object.data)
 
     @classmethod
-    def from_python_not_null(cls, stream, value):
-        header_class = cls.build_header_class()
-        header = header_class()
-        header.type_code = int.from_bytes(
-            cls.type_code,
-            byteorder=PROTOCOL_BYTE_ORDER
-        )
-        header.length = len(value)
-        stream.write(header)
+    def from_python_not_null(cls, stream, value, **kwargs):
+        cls._write_header(stream, value)
 
         if isinstance(value, (bytes, bytearray)):
             stream.write(value)
@@ -281,10 +277,8 @@ class CharArrayObject(PrimitiveArrayObject):
     type_code = TC_CHAR_ARRAY
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        values = super().to_python(ctype_object, *args, **kwargs)
-        if values is None:
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        values = super().to_python_not_null(ctypes_object, *args, **kwargs)
         return [
             v.to_bytes(
                 ctypes.sizeof(cls.primitive_type.c_type),
@@ -302,11 +296,5 @@ class BoolArrayObject(PrimitiveArrayObject):
     type_code = TC_BOOL_ARRAY
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        if not ctype_object:
-            return None
-        length = getattr(ctype_object, "length", None)
-        if length is None:
-            return None
-
-        return [ctype_object.data[i] != 0 for i in range(length)]
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return [ctypes_object.data[i] != 0 for i in range(ctypes_object.length)]
diff --git a/pyignite/datatypes/primitive_objects.py b/pyignite/datatypes/primitive_objects.py
index 5849935..9b23ec9 100644
--- a/pyignite/datatypes/primitive_objects.py
+++ b/pyignite/datatypes/primitive_objects.py
@@ -65,12 +65,8 @@ class DataObject(Nullable):
         return data_type
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        return getattr(ctype_object, "value", None)
-
-    @classmethod
-    async def to_python_async(cls, ctype_object, *args, **kwargs):
-        return cls.to_python(ctype_object, *args, **kwargs)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.value
 
     @classmethod
     def from_python_not_null(cls, stream, value, **kwargs):
@@ -188,10 +184,8 @@ class CharObject(DataObject):
         return ord(value)
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        value = getattr(ctype_object, "value", None)
-        if value is None:
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        value = ctypes_object.value
         return value.to_bytes(
             ctypes.sizeof(cls.c_type),
             byteorder=PROTOCOL_BYTE_ORDER
@@ -224,8 +218,5 @@ class BoolObject(DataObject):
         return 1231 if value else 1237
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        value = getattr(ctype_object, "value", None)
-        if value is None:
-            return None
-        return value != 0
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.value != 0
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 4ca6795..5657afb 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -23,6 +23,7 @@ import uuid
 
 from pyignite.constants import *
 from pyignite.utils import datetime_hashcode, decimal_hashcode, hashcode
+from .base import IgniteDataType
 from .type_codes import *
 from .type_ids import *
 from .type_names import *
@@ -100,14 +101,14 @@ class String(Nullable):
         return data_type
 
     @classmethod
-    def to_python_not_null(cls, ctype_object, *args, **kwargs):
-        if ctype_object.length > 0:
-            return ctype_object.data.decode(PROTOCOL_STRING_ENCODING)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        if ctypes_object.length > 0:
+            return ctypes_object.data.decode(PROTOCOL_STRING_ENCODING)
 
         return ''
 
     @classmethod
-    def from_python_not_null(cls, stream, value):
+    def from_python_not_null(cls, stream, value, **kwargs):
         if isinstance(value, str):
             value = value.encode(PROTOCOL_STRING_ENCODING)
         length = len(value)
@@ -135,7 +136,7 @@ class DecimalObject(Nullable):
         return decimal_hashcode(value)
 
     @classmethod
-    def build_c_header(cls):
+    def build_c_type(cls, length):
         return type(
             cls.__name__,
             (ctypes.LittleEndianStructure,),
@@ -145,48 +146,41 @@ class DecimalObject(Nullable):
                     ('type_code', ctypes.c_byte),
                     ('scale', ctypes.c_int),
                     ('length', ctypes.c_int),
-                ],
+                    ('data', ctypes.c_ubyte * length)
+                ]
             }
         )
 
     @classmethod
     def parse_not_null(cls, stream):
-        header_class = cls.build_c_header()
-        header = stream.read_ctype(header_class)
-
-        data_type = type(
-            cls.__name__,
-            (header_class,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('data', ctypes.c_ubyte * header.length),
-                ],
-            }
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + int_sz + b_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
         )
-
+        data_type = cls.build_c_type(length)
         stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
         return data_type
 
     @classmethod
-    def to_python_not_null(cls, ctype_object, *args, **kwargs):
-        sign = 1 if ctype_object.data[0] & 0x80 else 0
-        data = ctype_object.data[1:]
-        data.insert(0, ctype_object.data[0] & 0x7f)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        sign = 1 if ctypes_object.data[0] & 0x80 else 0
+        data = ctypes_object.data[1:]
+        data.insert(0, ctypes_object.data[0] & 0x7f)
         # decode n-byte integer
         result = sum([
             [x for x in reversed(data)][i] * 0x100 ** i for i in
             range(len(data))
         ])
         # apply scale
-        result = result / decimal.Decimal('10') ** decimal.Decimal(ctype_object.scale)
+        result = result / decimal.Decimal('10') ** decimal.Decimal(ctypes_object.scale)
         if sign:
             # apply sign
             result = -result
         return result
 
     @classmethod
-    def from_python_not_null(cls, stream, value: decimal.Decimal):
+    def from_python_not_null(cls, stream, value: decimal.Decimal, **kwargs):
         sign, digits, scale = value.normalize().as_tuple()
         integer = int(''.join([str(d) for d in digits]))
         # calculate number of bytes (at least one, and not forget the sign bit)
@@ -202,17 +196,7 @@ class DecimalObject(Nullable):
             data[0] |= 0x80
         else:
             data[0] &= 0x7f
-        header_class = cls.build_c_header()
-        data_class = type(
-            cls.__name__,
-            (header_class,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('data', ctypes.c_ubyte * length),
-                ],
-            }
-        )
+        data_class = cls.build_c_type(length)
         data_object = data_class()
         data_object.type_code = int.from_bytes(
             cls.type_code,
@@ -266,7 +250,7 @@ class UUIDObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python_not_null(cls, stream, value: uuid.UUID):
+    def from_python_not_null(cls, stream, value: uuid.UUID, **kwargs):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -381,7 +365,7 @@ class DateObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python_not_null(cls, stream, value: [date, datetime]):
+    def from_python_not_null(cls, stream, value: [date, datetime], **kwargs):
         if type(value) is date:
             value = datetime.combine(value, time())
         data_type = cls.build_c_type()
@@ -433,7 +417,7 @@ class TimeObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python_not_null(cls, stream, value: timedelta):
+    def from_python_not_null(cls, stream, value: timedelta, **kwargs):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -480,7 +464,7 @@ class EnumObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python_not_null(cls, stream, value: tuple):
+    def from_python_not_null(cls, stream, value: tuple, **kwargs):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -505,84 +489,89 @@ class BinaryEnumObject(EnumObject):
     type_code = TC_BINARY_ENUM
 
 
-class StandardArray(Nullable):
-    """
-    Base class for array of primitives. Payload-only.
-    """
-    _type_name = None
-    _type_id = None
+class _StandardArrayBase:
     standard_type = None
-    type_code = None
 
     @classmethod
-    def build_header_class(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('length', ctypes.c_int),
-                ],
-            }
-        )
+    def _parse_header(cls, stream):
+        raise NotImplementedError
 
     @classmethod
-    def parse_not_null(cls, stream):
-        header_class = cls.build_header_class()
-        header = stream.read_ctype(header_class)
-        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+    def _parse(cls, stream):
+        fields, length = cls._parse_header(stream)
 
-        fields = []
-        for i in range(header.length):
+        for i in range(length):
             c_type = cls.standard_type.parse(stream)
-            fields.append(('element_{}'.format(i), c_type))
+            fields.append((f'element_{i}', c_type))
 
-        final_class = type(
+        return type(
             cls.__name__,
-            (header_class,),
+            (ctypes.LittleEndianStructure,),
             {
                 '_pack_': 1,
                 '_fields_': fields,
             }
         )
-        return final_class
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        length = getattr(ctype_object, "length", None)
-        if length is None:
-            return None
+    def _write_header(cls, stream, value, **kwargs):
+        raise NotImplementedError
 
-        result = []
-        for i in range(length):
-            result.append(
-                cls.standard_type.to_python(
-                    getattr(ctype_object, 'element_{}'.format(i)),
-                    *args, **kwargs
-                )
-            )
-        return result
+    @classmethod
+    def _from_python(cls, stream, value, **kwargs):
+        cls._write_header(stream, value, **kwargs)
+        for x in value:
+            cls.standard_type.from_python(stream, x)
 
     @classmethod
-    async def to_python_async(cls, ctypes_object, *args, **kwargs):
-        return cls.to_python(ctypes_object, *args, **kwargs)
+    def _to_python(cls, ctypes_object, *args, **kwargs):
+        length = ctypes_object.length
+        return [
+            cls.standard_type.to_python(
+                getattr(ctypes_object, f'element_{i}'), *args, **kwargs
+            ) for i in range(length)
+        ]
+
+
+class StandardArray(IgniteDataType, _StandardArrayBase):
+    """
+    Base class for array of primitives. Payload-only.
+    """
+    _type_name = None
+    _type_id = None
+    type_code = None
 
     @classmethod
-    def from_python_not_null(cls, stream, value, **kwargs):
-        header_class = cls.build_header_class()
-        header = header_class()
-        if hasattr(header, 'type_code'):
-            header.type_code = int.from_bytes(
-                cls.type_code,
+    def _parse_header(cls, stream):
+        int_sz = ctypes.sizeof(ctypes.c_int)
+        length = int.from_bytes(
+            stream.slice(stream.tell(), int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(int_sz, SEEK_CUR)
+
+        return [('length', ctypes.c_int)], length
+
+    @classmethod
+    def parse(cls, stream):
+        return cls._parse(stream)
+
+    @classmethod
+    def _write_header(cls, stream, value, **kwargs):
+        stream.write(
+            len(value).to_bytes(
+                length=ctypes.sizeof(ctypes.c_int),
                 byteorder=PROTOCOL_BYTE_ORDER
             )
-        length = len(value)
-        header.length = length
+        )
 
-        stream.write(header)
-        for x in value:
-            cls.standard_type.from_python(stream, x)
+    @classmethod
+    def from_python(cls, stream, value, **kwargs):
+        cls._from_python(stream, value, **kwargs)
+
+    @classmethod
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        return cls._to_python(ctypes_object, *args, **kwargs)
 
 
 class StringArray(StandardArray):
@@ -633,26 +622,47 @@ class EnumArray(StandardArray):
     standard_type = EnumObject
 
 
-class StandardArrayObject(StandardArray):
+class StandardArrayObject(Nullable, _StandardArrayBase):
     _type_name = None
     _type_id = None
+    standard_type = None
+    type_code = None
     pythonic = list
     default = []
 
     @classmethod
-    def build_header_class(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('length', ctypes.c_int),
-                ],
-            }
+    def _parse_header(cls, stream):
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(int_sz + b_sz, SEEK_CUR)
+
+        return [('type_code', ctypes.c_byte), ('length', ctypes.c_int)], length
+
+    @classmethod
+    def parse_not_null(cls, stream):
+        return cls._parse(stream)
+
+    @classmethod
+    def _write_header(cls, stream, value, **kwargs):
+        stream.write(cls.type_code)
+        stream.write(
+            len(value).to_bytes(
+                length=ctypes.sizeof(ctypes.c_int),
+                byteorder=PROTOCOL_BYTE_ORDER
+            )
         )
 
+    @classmethod
+    def from_python_not_null(cls, stream, value, **kwargs):
+        cls._from_python(stream, value, **kwargs)
+
+    @classmethod
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return cls._to_python(ctypes_object, *args, **kwargs)
+
 
 class StringArrayObject(StandardArrayObject):
     """ List of strings. """
@@ -714,45 +724,43 @@ class EnumArrayObject(StandardArrayObject):
     standard_type = EnumObject
     type_code = TC_ENUM_ARRAY
 
+    OBJECT = -1
+
     @classmethod
-    def build_header_class(cls):
-        return type(
-            cls.__name__ + 'Header',
-            (ctypes.LittleEndianStructure,),
-            {
-                '_pack_': 1,
-                '_fields_': [
-                    ('type_code', ctypes.c_byte),
-                    ('type_id', ctypes.c_int),
-                    ('length', ctypes.c_int),
-                ],
-            }
+    def _parse_header(cls, stream):
+        int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+        length = int.from_bytes(
+            stream.slice(stream.tell() + b_sz + int_sz, int_sz),
+            byteorder=PROTOCOL_BYTE_ORDER
         )
+        stream.seek(2 * int_sz + b_sz, SEEK_CUR)
+        return [('type_code', ctypes.c_byte), ('type_id', ctypes.c_int), ('length', ctypes.c_int)], length
 
     @classmethod
-    def from_python_not_null(cls, stream, value, **kwargs):
-        type_id, value = value
-        header_class = cls.build_header_class()
-        header = header_class()
-        if hasattr(header, 'type_code'):
-            header.type_code = int.from_bytes(
-                cls.type_code,
+    def _write_header(cls, stream, value, type_id=-1):
+        stream.write(cls.type_code)
+        stream.write(
+            type_id.to_bytes(
+                length=ctypes.sizeof(ctypes.c_int),
+                byteorder=PROTOCOL_BYTE_ORDER,
+                signed=True
+            )
+        )
+        stream.write(
+            len(value).to_bytes(
+                length=ctypes.sizeof(ctypes.c_int),
                 byteorder=PROTOCOL_BYTE_ORDER
             )
-        length = len(value)
-        header.length = length
-        header.type_id = type_id
+        )
 
-        stream.write(header)
-        for x in value:
-            cls.standard_type.from_python(stream, x)
+    @classmethod
+    def from_python_not_null(cls, stream, value, **kwargs):
+        type_id, value = value
+        super().from_python_not_null(stream, value, type_id=type_id)
 
     @classmethod
-    def to_python_not_null(cls, ctype_object, *args, **kwargs):
-        type_id = getattr(ctype_object, "type_id", None)
-        if type_id is None:
-            return None
-        return type_id, super().to_python(ctype_object, *args, **kwargs)
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        return ctypes_object.type_id, cls._to_python(ctypes_object, *args, **kwargs)
 
 
 class BinaryEnumArrayObject(EnumArrayObject):
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index f0338e1..c0311ec 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -128,25 +128,25 @@ class Response:
             c_type = await ignite_type.parse_async(stream)
             fields.append((name, c_type))
 
-    def to_python(self, ctype_object, *args, **kwargs):
+    def to_python(self, ctypes_object, *args, **kwargs):
         if not self.following:
             return None
 
         result = OrderedDict()
         for name, c_type in self.following:
             result[name] = c_type.to_python(
-                getattr(ctype_object, name),
+                getattr(ctypes_object, name),
                 *args, **kwargs
             )
 
         return result
 
-    async def to_python_async(self, ctype_object, *args, **kwargs):
+    async def to_python_async(self, ctypes_object, *args, **kwargs):
         if not self.following:
             return None
 
         values = await asyncio.gather(
-            *[c_type.to_python_async(getattr(ctype_object, name), *args, **kwargs) for name, c_type in self.following]
+            *[c_type.to_python_async(getattr(ctypes_object, name), *args, **kwargs) for name, c_type in self.following]
         )
 
         return OrderedDict([(name, values[i]) for i, (name, _) in enumerate(self.following)])
@@ -239,13 +239,13 @@ class SQLResponse(Response):
             ('more', ctypes.c_byte),
         ]
 
-    def to_python(self, ctype_object, *args, **kwargs):
-        if getattr(ctype_object, 'status_code', 0) == 0:
-            result = self.__to_python_result_header(ctype_object, *args, **kwargs)
+    def to_python(self, ctypes_object, *args, **kwargs):
+        if getattr(ctypes_object, 'status_code', 0) == 0:
+            result = self.__to_python_result_header(ctypes_object, *args, **kwargs)
 
-            for row_item in ctype_object.data._fields_:
+            for row_item in ctypes_object.data._fields_:
                 row_name = row_item[0]
-                row_object = getattr(ctype_object.data, row_name)
+                row_object = getattr(ctypes_object.data, row_name)
                 row = []
                 for col_item in row_object._fields_:
                     col_name = col_item[0]
@@ -254,14 +254,14 @@ class SQLResponse(Response):
                 result['data'].append(row)
             return result
 
-    async def to_python_async(self, ctype_object, *args, **kwargs):
-        if getattr(ctype_object, 'status_code', 0) == 0:
-            result = self.__to_python_result_header(ctype_object, *args, **kwargs)
+    async def to_python_async(self, ctypes_object, *args, **kwargs):
+        if getattr(ctypes_object, 'status_code', 0) == 0:
+            result = self.__to_python_result_header(ctypes_object, *args, **kwargs)
 
             data_coro = []
-            for row_item in ctype_object.data._fields_:
+            for row_item in ctypes_object.data._fields_:
                 row_name = row_item[0]
-                row_object = getattr(ctype_object.data, row_name)
+                row_object = getattr(ctypes_object.data, row_name)
                 row_coro = []
                 for col_item in row_object._fields_:
                     col_name = col_item[0]
@@ -274,18 +274,18 @@ class SQLResponse(Response):
             return result
 
     @staticmethod
-    def __to_python_result_header(ctype_object, *args, **kwargs):
+    def __to_python_result_header(ctypes_object, *args, **kwargs):
         result = {
-            'more': Bool.to_python(ctype_object.more, *args, **kwargs),
+            'more': Bool.to_python(ctypes_object.more, *args, **kwargs),
             'data': [],
         }
-        if hasattr(ctype_object, 'fields'):
-            result['fields'] = StringArray.to_python(ctype_object.fields, *args, **kwargs)
+        if hasattr(ctypes_object, 'fields'):
+            result['fields'] = StringArray.to_python(ctypes_object.fields, *args, **kwargs)
         else:
-            result['field_count'] = Int.to_python(ctype_object.field_count, *args, **kwargs)
+            result['field_count'] = Int.to_python(ctypes_object.field_count, *args, **kwargs)
 
-        if hasattr(ctype_object, 'cursor'):
-            result['cursor'] = Long.to_python(ctype_object.cursor, *args, **kwargs)
+        if hasattr(ctypes_object, 'cursor'):
+            result['cursor'] = Long.to_python(ctypes_object.cursor, *args, **kwargs)
         return result
 
 
@@ -328,26 +328,26 @@ class BinaryTypeResponse(Response):
 
         return type_exists
 
-    def to_python(self, ctype_object, *args, **kwargs):
-        if getattr(ctype_object, 'status_code', 0) == 0:
+    def to_python(self, ctypes_object, *args, **kwargs):
+        if getattr(ctypes_object, 'status_code', 0) == 0:
             result = {
-                'type_exists': Bool.to_python(ctype_object.type_exists)
+                'type_exists': Bool.to_python(ctypes_object.type_exists)
             }
 
-            if hasattr(ctype_object, 'body'):
-                result.update(body_struct.to_python(ctype_object.body))
+            if hasattr(ctypes_object, 'body'):
+                result.update(body_struct.to_python(ctypes_object.body))
 
-            if hasattr(ctype_object, 'enums'):
-                result['enums'] = enum_struct.to_python(ctype_object.enums)
+            if hasattr(ctypes_object, 'enums'):
+                result['enums'] = enum_struct.to_python(ctypes_object.enums)
 
-            if hasattr(ctype_object, 'schema'):
+            if hasattr(ctypes_object, 'schema'):
                 result['schema'] = {
                     x['schema_id']: [
                         z['schema_field_id'] for z in x['schema_fields']
                     ]
-                    for x in schema_struct.to_python(ctype_object.schema)
+                    for x in schema_struct.to_python(ctypes_object.schema)
                 }
             return result
 
-    async def to_python_async(self, ctype_object, *args, **kwargs):
-        return self.to_python(ctype_object, *args, **kwargs)
+    async def to_python_async(self, ctypes_object, *args, **kwargs):
+        return self.to_python(ctypes_object, *args, **kwargs)
diff --git a/tests/common/test_datatypes.py b/tests/common/test_datatypes.py
index c1aa19f..6771f94 100644
--- a/tests/common/test_datatypes.py
+++ b/tests/common/test_datatypes.py
@@ -50,6 +50,7 @@ put_get_data_params = [
 
     # arrays of integers
     ([1, 2, 3, 5], None),
+    (b'buzz', None),
     (b'buzz', ByteArrayObject),
     (bytearray([7, 8, 8, 11]), None),
     (bytearray([7, 8, 8, 11]), ByteArrayObject),
@@ -122,7 +123,7 @@ put_get_data_params = [
     ((-1, [(6001, 1), (6002, 2), (6003, 3)]), BinaryEnumArrayObject),
 
     # object array
-    ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3')]), ObjectArrayObject),
+    ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3'), bytearray(b'\x10\x20')]), ObjectArrayObject),
 
     # collection
     ((CollectionObject.LINKED_LIST, [1, 2, 3]), None),
@@ -153,42 +154,47 @@ async def test_put_get_data_async(async_cache, value, value_hint):
 
 
 bytearray_params = [
-    [1, 2, 3, 5],
-    (7, 8, 13, 18),
-    (-128, -1, 0, 1, 127, 255),
+    ([1, 2, 3, 5], ByteArrayObject),
+    ((7, 8, 13, 18), ByteArrayObject),
+    ((-128, -1, 0, 1, 127, 255), ByteArrayObject),
+    (b'\x01\x03\x10', None),
+    (bytearray(b'\x01\x30'), None)
 ]
 
 
 @pytest.mark.parametrize(
-    'value',
+    'value,type_hint',
     bytearray_params
 )
-def test_bytearray_from_list_or_tuple(cache, value):
+def test_bytearray_from_different_input(cache, value, type_hint):
     """
     ByteArrayObject's pythonic type is `bytearray`, but it should also accept
     lists or tuples as a content.
     """
-
-    cache.put('my_key', value, value_hint=ByteArrayObject)
-
-    assert cache.get('my_key') == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
+    cache.put('my_key', value, value_hint=type_hint)
+    __check_bytearray_from_different_input(cache.get('my_key'), value)
 
 
 @pytest.mark.parametrize(
-    'value',
+    'value,type_hint',
     bytearray_params
 )
 @pytest.mark.asyncio
-async def test_bytearray_from_list_or_tuple_async(async_cache, value):
+async def test_bytearray_from_different_input_async(async_cache, value, type_hint):
     """
     ByteArrayObject's pythonic type is `bytearray`, but it should also accept
     lists or tuples as a content.
     """
-
     await async_cache.put('my_key', value, value_hint=ByteArrayObject)
+    __check_bytearray_from_different_input(await async_cache.get('my_key'), value)
+
 
-    result = await async_cache.get('my_key')
-    assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
+def __check_bytearray_from_different_input(result, value):
+    if isinstance(value, (bytes, bytearray)):
+        assert isinstance(result, bytes)
+        assert value == result
+    else:
+        assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
 
 
 uuid_params = [
diff --git a/tests/common/test_key_value.py b/tests/common/test_key_value.py
index 6e6df61..b03bec2 100644
--- a/tests/common/test_key_value.py
+++ b/tests/common/test_key_value.py
@@ -422,10 +422,13 @@ async def test_put_get_collection_async(async_cache, key, hinted_value, value):
 @pytest.fixture
 def complex_map():
     return {"test" + str(i): ((MapObject.HASH_MAP,
-                               {"key_1": ((1, ["value_1", 1.0]), CollectionObject),
-                                "key_2": ((1, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]), CollectionObject),
-                                "key_3": ((1, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]), CollectionObject),
-                                "key_4": ((1, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]), CollectionObject),
+                               {"key_1": ((CollectionObject.ARR_LIST, ["value_1", 1.0]), CollectionObject),
+                                "key_2": ((CollectionObject.ARR_LIST, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]),
+                                          CollectionObject),
+                                "key_3": ((CollectionObject.ARR_LIST, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]),
+                                          CollectionObject),
+                                "key_4": ((CollectionObject.ARR_LIST, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]),
+                                          CollectionObject),
                                 'key_5': False,
                                 "key_6": "value_6"}), MapObject) for i in range(10000)}
 
diff --git a/tests/common/test_sql.py b/tests/common/test_sql.py
index 0841b7f..b947fbc 100644
--- a/tests/common/test_sql.py
+++ b/tests/common/test_sql.py
@@ -325,3 +325,131 @@ def __check_query_with_cache(client, cache_fixture):
                 assert test_value == received
 
     return async_inner() if isinstance(cache, AioCache) else inner()
+
+
+VARBIN_CREATE_QUERY = 'CREATE TABLE VarbinTable(id int primary key, varbin VARBINARY)'
+VARBIN_DROP_QUERY = 'DROP TABLE VarbinTable'
+VARBIN_MERGE_QUERY = 'MERGE INTO VarbinTable(id, varbin) VALUES (?, ?)'
+VARBIN_SELECT_QUERY = 'SELECT * FROM VarbinTable'
+
+VARBIN_TEST_PARAMS = [
+    bytearray('Test message', 'UTF-8'),
+    bytes('Test message', 'UTF-8')
+]
+
+
+@pytest.fixture
+def varbin_table(client):
+    client.sql(VARBIN_CREATE_QUERY)
+    yield None
+    client.sql(VARBIN_DROP_QUERY)
+
+
+@pytest.mark.parametrize(
+    'value', VARBIN_TEST_PARAMS
+)
+def test_sql_cache_varbinary_handling(client, varbin_table, value):
+    client.sql(VARBIN_MERGE_QUERY, query_args=(1, value))
+    with client.sql(VARBIN_SELECT_QUERY) as cursor:
+        for row in cursor:
+            assert isinstance(row[1], bytes)
+            assert row[1] == value
+            break
+
+
+@pytest.fixture
+async def varbin_table_async(async_client):
+    await async_client.sql(VARBIN_CREATE_QUERY)
+    yield None
+    await async_client.sql(VARBIN_DROP_QUERY)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    'value', VARBIN_TEST_PARAMS
+)
+async def test_sql_cache_varbinary_handling_async(async_client, varbin_table_async, value):
+    await async_client.sql(VARBIN_MERGE_QUERY, query_args=(1, value))
+    async with async_client.sql(VARBIN_SELECT_QUERY) as cursor:
+        async for row in cursor:
+            assert isinstance(row[1], bytes)
+            assert row[1] == value
+            break
+
+
+@pytest.fixture
+def varbin_cache_settings():
+    cache_name = 'varbin_cache'
+    table_name = f'{cache_name}_table'.upper()
+
+    yield {
+        PROP_NAME: cache_name,
+        PROP_SQL_SCHEMA: 'PUBLIC',
+        PROP_CACHE_MODE: CacheMode.PARTITIONED,
+        PROP_QUERY_ENTITIES: [
+            {
+                'table_name': table_name,
+                'key_field_name': 'ID',
+                'value_field_name': 'VALUE',
+                'key_type_name': 'java.lang.Long',
+                'value_type_name': 'byte[]',
+                'query_indexes': [],
+                'field_name_aliases': [],
+                'query_fields': [
+                    {
+                        'name': 'ID',
+                        'type_name': 'java.lang.Long',
+                        'is_key_field': True,
+                        'is_notnull_constraint_field': True,
+                    },
+                    {
+                        'name': 'VALUE',
+                        'type_name': 'byte[]',
+                    },
+                ],
+            },
+        ],
+    }
+
+
+VARBIN_CACHE_TABLE_NAME = 'varbin_cache_table'.upper()
+VARBIN_CACHE_SELECT_QUERY = f'SELECT * FROM {VARBIN_CACHE_TABLE_NAME}'
+
+
+@pytest.fixture
+def varbin_cache(client, varbin_cache_settings):
+    cache = client.get_or_create_cache(varbin_cache_settings)
+    yield cache
+    cache.destroy()
+
+
+@pytest.mark.parametrize(
+    'value', VARBIN_TEST_PARAMS
+)
+def test_cache_varbinary_handling(client, varbin_cache, value):
+    varbin_cache.put(1, value)
+    with client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor:
+        for row in cursor:
+            assert isinstance(row[1], bytes)
+            assert row[1] == value
+            break
+
+
+@pytest.fixture
+async def varbin_cache_async(async_client, varbin_cache_settings):
+    cache = await async_client.get_or_create_cache(varbin_cache_settings)
+    yield cache
+    await cache.destroy()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    'value', VARBIN_TEST_PARAMS
+)
+async def test_cache_varbinary_handling_async(async_client, varbin_cache_async, value):
+    await varbin_cache_async.put(1, value)
+    async with async_client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor:
+        async for row in cursor:
+            assert isinstance(row[1], bytes)
+            assert row[1] == value
+            break