You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/04/12 09:51:17 UTC
[ignite-python-thin-client] branch master updated: IGNITE-14511 Fix
serialization of bytes,
improve serialization-deserialization of collections. - Fixes #30.
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 3586db7 IGNITE-14511 Fix serialization of bytes, improve serialization-deserialization of collections. - Fixes #30.
3586db7 is described below
commit 3586db7c30e22aff61d2251fec1ffaa355b4c599
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Mon Apr 12 12:50:49 2021 +0300
IGNITE-14511 Fix serialization of bytes, improve serialization-deserialization of collections. - Fixes #30.
---
pyignite/binary.py | 2 +-
pyignite/datatypes/base.py | 6 +-
pyignite/datatypes/cache_properties.py | 14 +-
pyignite/datatypes/complex.py | 595 +++++++++++++++-----------------
pyignite/datatypes/internal.py | 199 +++++------
pyignite/datatypes/primitive.py | 14 +-
pyignite/datatypes/primitive_arrays.py | 142 ++++----
pyignite/datatypes/primitive_objects.py | 21 +-
pyignite/datatypes/standard.py | 280 +++++++--------
pyignite/queries/response.py | 64 ++--
tests/common/test_datatypes.py | 36 +-
tests/common/test_key_value.py | 11 +-
tests/common/test_sql.py | 128 +++++++
13 files changed, 783 insertions(+), 729 deletions(-)
diff --git a/pyignite/binary.py b/pyignite/binary.py
index 5a5f895..551f1d0 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -151,7 +151,7 @@ class GenericObjectMeta(GenericObjectPropsMeta):
write_footer(self, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf)
def write_header(obj, stream):
- header_class = BinaryObject.build_header()
+ header_class = BinaryObject.get_header_class()
header = header_class()
header.type_code = int.from_bytes(
BinaryObject.type_code,
diff --git a/pyignite/datatypes/base.py b/pyignite/datatypes/base.py
index fbd798b..87b251c 100644
--- a/pyignite/datatypes/base.py
+++ b/pyignite/datatypes/base.py
@@ -72,9 +72,9 @@ class IgniteDataType(metaclass=IgniteDataTypeMeta):
cls.from_python(stream, value, **kwargs)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
+ def to_python(cls, ctypes_object, *args, **kwargs):
raise NotImplementedError
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- return cls.to_python(ctype_object, *args, **kwargs)
+ async def to_python_async(cls, ctypes_object, *args, **kwargs):
+ return cls.to_python(ctypes_object, *args, **kwargs)
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index d924507..9bf34de 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -115,12 +115,12 @@ class PropBase:
return cls.parse(stream)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return cls.prop_data_class.to_python(ctype_object.data, *args, **kwargs)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- return cls.to_python(ctype_object, *args, **kwargs)
+ async def to_python_async(cls, ctypes_object, *args, **kwargs):
+ return cls.to_python(ctypes_object, *args, **kwargs)
@classmethod
def from_python(cls, stream, value):
@@ -295,6 +295,6 @@ class AnyProperty(PropBase):
)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- prop_data_class = prop_map(ctype_object.prop_code)
- return prop_data_class.to_python(ctype_object.data, *args, **kwargs)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ prop_data_class = prop_map(ctypes_object.prop_code)
+ return prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index 5cb6160..119c552 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -20,6 +20,7 @@ from typing import Optional
from pyignite.constants import *
from pyignite.exceptions import ParseError
+from .base import IgniteDataType
from .internal import AnyDataObject, Struct, infer_from_python, infer_from_python_async
from .type_codes import *
from .type_ids import *
@@ -41,122 +42,100 @@ class ObjectArrayObject(Nullable):
_type_name = NAME_OBJ_ARR
_type_id = TYPE_OBJ_ARR
+ _fields = [
+ ('type_code', ctypes.c_byte),
+ ('type_id', ctypes.c_int),
+ ('length', ctypes.c_int)
+ ]
type_code = TC_OBJECT_ARRAY
@classmethod
- def build_header(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('type_id', ctypes.c_int),
- ('length', ctypes.c_int),
- ],
- }
- )
-
- @classmethod
def parse_not_null(cls, stream):
- header, header_class = cls.__parse_header(stream)
+ length, fields = cls.__get_length(stream), []
- fields = []
- for i in range(header.length):
+ for i in range(length):
c_type = AnyDataObject.parse(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return cls.__build_final_class(header_class, fields)
+ return cls.__build_final_class(fields)
@classmethod
async def parse_not_null_async(cls, stream):
- header, header_class = cls.__parse_header(stream)
-
- fields = []
- for i in range(header.length):
+ length, fields = cls.__get_length(stream), []
+ for i in range(length):
c_type = await AnyDataObject.parse_async(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return cls.__build_final_class(header_class, fields)
+ return cls.__build_final_class(fields)
@classmethod
- def __parse_header(cls, stream):
- header_class = cls.build_header()
- header = stream.read_ctype(header_class)
- stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
- return header, header_class
+ def __get_length(cls, stream):
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz + int_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(2 * int_sz + b_sz, SEEK_CUR)
+ return length
@classmethod
- def __build_final_class(cls, header_class, fields):
+ def __build_final_class(cls, fields):
return type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
- '_fields_': fields,
+ '_fields_': cls._fields + fields,
}
)
@classmethod
- def to_python_not_null(cls, ctype_object, *args, **kwargs):
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
result = []
- for i in range(ctype_object.length):
+ for i in range(ctypes_object.length):
result.append(
AnyDataObject.to_python(
- getattr(ctype_object, 'element_{}'.format(i)),
+ getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
)
)
- return ctype_object.type_id, result
+ return ctypes_object.type_id, result
@classmethod
- async def to_python_not_null_async(cls, ctype_object, *args, **kwargs):
+ async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
result = [
await AnyDataObject.to_python_async(
- getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs
+ getattr(ctypes_object, f'element_{i}'), *args, **kwargs
)
- for i in range(ctype_object.length)]
- return ctype_object.type_id, result
+ for i in range(ctypes_object.length)]
+ return ctypes_object.type_id, result
@classmethod
def from_python_not_null(cls, stream, value, *args, **kwargs):
- type_or_id, value = value
- try:
- length = len(value)
- except TypeError:
- value = [value]
- length = 1
-
- cls.__write_header(stream, type_or_id, length)
+ value = cls.__write_header(stream, value)
for x in value:
infer_from_python(stream, x)
@classmethod
async def from_python_not_null_async(cls, stream, value, *args, **kwargs):
- type_or_id, value = value
+ value = cls.__write_header(stream, value)
+ for x in value:
+ await infer_from_python_async(stream, x)
+
+ @classmethod
+ def __write_header(cls, stream, value):
+ type_id, value = value
try:
length = len(value)
except TypeError:
value = [value]
length = 1
- cls.__write_header(stream, type_or_id, length)
- for x in value:
- await infer_from_python_async(stream, x)
+ stream.write(cls.type_code)
+ stream.write(type_id.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+ stream.write(length.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
- @classmethod
- def __write_header(cls, stream, type_or_id, length):
- header_class = cls.build_header()
- header = header_class()
- header.type_code = int.from_bytes(
- cls.type_code,
- byteorder=PROTOCOL_BYTE_ORDER
- )
- header.length = length
- header.type_id = type_or_id
-
- stream.write(header)
+ return value
class WrappedDataObject(Nullable):
@@ -171,31 +150,22 @@ class WrappedDataObject(Nullable):
type_code = TC_ARRAY_WRAPPED_OBJECTS
@classmethod
- def build_header(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('length', ctypes.c_int),
- ],
- }
- )
-
- @classmethod
def parse_not_null(cls, stream):
- header_class = cls.build_header()
- header = stream.read_ctype(header_class)
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
final_class = type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': [
- ('payload', ctypes.c_byte * header.length),
+ ('type_code', ctypes.c_byte),
+ ('length', ctypes.c_int),
+ ('payload', ctypes.c_byte * length),
('offset', ctypes.c_int),
],
}
@@ -205,11 +175,11 @@ class WrappedDataObject(Nullable):
return final_class
@classmethod
- def to_python_not_null(cls, ctype_object, *args, **kwargs):
- return bytes(ctype_object.payload), ctype_object.offset
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return bytes(ctypes_object.payload), ctypes_object.offset
@classmethod
- def from_python(cls, stream, value, *args, **kwargs):
+ def from_python_not_null(cls, stream, value, *args, **kwargs):
raise ParseError('Send unwrapped data.')
@@ -251,59 +221,47 @@ class CollectionObject(Nullable):
_type_name = NAME_COL
_type_id = TYPE_COL
+ _header_class = None
type_code = TC_COLLECTION
pythonic = list
default = []
@classmethod
- def build_header(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('length', ctypes.c_int),
- ('type', ctypes.c_byte),
- ],
- }
- )
-
- @classmethod
def parse_not_null(cls, stream):
- header, header_class = cls.__parse_header(stream)
+ fields, length = cls.__parse_header(stream)
- fields = []
- for i in range(header.length):
+ for i in range(length):
c_type = AnyDataObject.parse(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return cls.__build_final_class(header_class, fields)
+ return cls.__build_final_class(fields)
@classmethod
async def parse_not_null_async(cls, stream):
- header, header_class = cls.__parse_header(stream)
+ fields, length = cls.__parse_header(stream)
- fields = []
- for i in range(header.length):
+ for i in range(length):
c_type = await AnyDataObject.parse_async(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return cls.__build_final_class(header_class, fields)
+ return cls.__build_final_class(fields)
@classmethod
def __parse_header(cls, stream):
- header_class = cls.build_header()
- header = stream.read_ctype(header_class)
- stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
- return header, header_class
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ header_fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)]
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(int_sz + 2 * b_sz, SEEK_CUR)
+ return header_fields, length
@classmethod
- def __build_final_class(cls, header_class, fields):
+ def __build_final_class(cls, fields):
return type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': fields,
@@ -311,134 +269,91 @@ class CollectionObject(Nullable):
)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- length = cls.__get_length(ctype_object)
- if length is None:
- return None
-
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
result = [
- AnyDataObject.to_python(getattr(ctype_object, f'element_{i}'), *args, **kwargs)
- for i in range(length)
+ AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
+ for i in range(ctypes_object.length)
]
- return ctype_object.type, result
+ return ctypes_object.type, result
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- length = cls.__get_length(ctype_object)
- if length is None:
- return None
-
+ async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
result_coro = [
- AnyDataObject.to_python_async(getattr(ctype_object, f'element_{i}'), *args, **kwargs)
- for i in range(length)
+ AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
+ for i in range(ctypes_object.length)
]
- return ctype_object.type, await asyncio.gather(*result_coro)
-
- @classmethod
- def __get_length(cls, ctype_object):
- return getattr(ctype_object, "length", None)
+ return ctypes_object.type, await asyncio.gather(*result_coro)
@classmethod
def from_python_not_null(cls, stream, value, *args, **kwargs):
- type_or_id, value = value
+ type_id, value = value
try:
length = len(value)
except TypeError:
value = [value]
length = 1
- cls.__write_header(stream, type_or_id, length)
+ cls.__write_header(stream, type_id, length)
for x in value:
infer_from_python(stream, x)
@classmethod
async def from_python_not_null_async(cls, stream, value, *args, **kwargs):
- type_or_id, value = value
+ type_id, value = value
try:
length = len(value)
except TypeError:
value = [value]
length = 1
- cls.__write_header(stream, type_or_id, length)
+ cls.__write_header(stream, type_id, length)
for x in value:
await infer_from_python_async(stream, x)
@classmethod
- def __write_header(cls, stream, type_or_id, length):
- header_class = cls.build_header()
- header = header_class()
- header.type_code = int.from_bytes(
- cls.type_code,
- byteorder=PROTOCOL_BYTE_ORDER
+ def __write_header(cls, stream, type_id, length):
+ stream.write(cls.type_code)
+ stream.write(length.to_bytes(
+ ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER
+ ))
+ stream.write(type_id.to_bytes(
+ length=ctypes.sizeof(ctypes.c_byte),
+ byteorder=PROTOCOL_BYTE_ORDER,
+ signed=True)
)
- header.length = length
- header.type = type_or_id
-
- stream.write(header)
-
-class Map(Nullable):
- """
- Dictionary type, payload-only.
-
- Ignite does not track the order of key-value pairs in its caches, hence
- the ordinary Python dict type, not the collections.OrderedDict.
- """
- _type_name = NAME_MAP
- _type_id = TYPE_MAP
+class _MapBase:
HASH_MAP = 1
LINKED_HASH_MAP = 2
@classmethod
- def build_header(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ],
- }
- )
+ def _parse_header(cls, stream):
+ raise NotImplementedError
@classmethod
- def parse_not_null(cls, stream):
- header, header_class = cls.__parse_header(stream)
-
- fields = []
- for i in range(header.length << 1):
+ def _parse(cls, stream):
+ fields, length = cls._parse_header(stream)
+ for i in range(length << 1):
c_type = AnyDataObject.parse(stream)
- fields.append(('element_{}'.format(i), c_type))
-
- return cls.__build_final_class(header_class, fields)
+ fields.append((f'element_{i}', c_type))
+ return cls.__build_final_class(fields)
@classmethod
- async def parse_not_null_async(cls, stream):
- header, header_class = cls.__parse_header(stream)
-
- fields = []
- for i in range(header.length << 1):
+ async def _parse_async(cls, stream):
+ fields, length = cls._parse_header(stream)
+ for i in range(length << 1):
c_type = await AnyDataObject.parse_async(stream)
- fields.append(('element_{}'.format(i), c_type))
-
- return cls.__build_final_class(header_class, fields)
+ fields.append((f'element_{i}', c_type))
- @classmethod
- def __parse_header(cls, stream):
- header_class = cls.build_header()
- header = stream.read_ctype(header_class)
- stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
- return header, header_class
+ return cls.__build_final_class(fields)
@classmethod
- def __build_final_class(cls, header_class, fields):
+ def __build_final_class(cls, fields):
return type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': fields,
@@ -446,76 +361,118 @@ class Map(Nullable):
)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- map_cls = cls.__get_map_class(ctype_object)
+ def _to_python(cls, ctypes_object, *args, **kwargs):
+ map_cls = cls.__get_map_class(ctypes_object)
result = map_cls()
- for i in range(0, ctype_object.length << 1, 2):
+ for i in range(0, ctypes_object.length << 1, 2):
k = AnyDataObject.to_python(
- getattr(ctype_object, 'element_{}'.format(i)),
+ getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
)
v = AnyDataObject.to_python(
- getattr(ctype_object, 'element_{}'.format(i + 1)),
+ getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
)
result[k] = v
return result
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- map_cls = cls.__get_map_class(ctype_object)
+ async def _to_python_async(cls, ctypes_object, *args, **kwargs):
+ map_cls = cls.__get_map_class(ctypes_object)
kv_pairs_coro = [
asyncio.gather(
AnyDataObject.to_python_async(
- getattr(ctype_object, 'element_{}'.format(i)),
+ getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
),
AnyDataObject.to_python_async(
- getattr(ctype_object, 'element_{}'.format(i + 1)),
+ getattr(ctypes_object, f'element_{i + 1}'),
*args, **kwargs
)
- ) for i in range(0, ctype_object.length << 1, 2)
+ ) for i in range(0, ctypes_object.length << 1, 2)
]
return map_cls(await asyncio.gather(*kv_pairs_coro))
@classmethod
- def __get_map_class(cls, ctype_object):
- map_type = getattr(ctype_object, 'type', cls.HASH_MAP)
+ def __get_map_class(cls, ctypes_object):
+ map_type = getattr(ctypes_object, 'type', cls.HASH_MAP)
return OrderedDict if map_type == cls.LINKED_HASH_MAP else dict
@classmethod
- def from_python(cls, stream, value, type_id=None):
- cls.__write_header(stream, type_id, len(value))
+ def _from_python(cls, stream, value, type_id=None):
+ cls._write_header(stream, type_id, len(value))
for k, v in value.items():
infer_from_python(stream, k)
infer_from_python(stream, v)
@classmethod
- async def from_python_async(cls, stream, value, type_id=None):
- cls.__write_header(stream, type_id, len(value))
+ async def _from_python_async(cls, stream, value, type_id):
+ cls._write_header(stream, type_id, len(value))
for k, v in value.items():
await infer_from_python_async(stream, k)
await infer_from_python_async(stream, v)
@classmethod
- def __write_header(cls, stream, type_id, length):
- header_class = cls.build_header()
- header = header_class()
- header.length = length
+ def _write_header(cls, stream, type_id, length):
+ raise NotImplementedError
- if hasattr(header, 'type_code'):
- header.type_code = int.from_bytes(cls.type_code, byteorder=PROTOCOL_BYTE_ORDER)
- if hasattr(header, 'type'):
- header.type = type_id
+class Map(IgniteDataType, _MapBase):
+ """
+ Dictionary type, payload-only.
- stream.write(header)
+ Ignite does not track the order of key-value pairs in its caches, hence
+ the ordinary Python dict type, not the collections.OrderedDict.
+ """
+ _type_name = NAME_MAP
+ _type_id = TYPE_MAP
+
+ @classmethod
+ def parse(cls, stream):
+ return cls._parse(stream)
+
+ @classmethod
+ async def parse_async(cls, stream):
+ return await cls._parse_async(stream)
+
+ @classmethod
+ def _parse_header(cls, stream):
+ int_sz = ctypes.sizeof(ctypes.c_int)
+ length = int.from_bytes(
+ stream.slice(stream.tell(), int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(int_sz, SEEK_CUR)
+ return [('length', ctypes.c_int)], length
+
+ @classmethod
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return cls._to_python(ctypes_object, *args, **kwargs)
+
+ @classmethod
+ async def to_python_async(cls, ctypes_object, *args, **kwargs):
+ return await cls._to_python_async(ctypes_object, *args, **kwargs)
+
+ @classmethod
+ def from_python(cls, stream, value, type_id=None):
+ return cls._from_python(stream, value, type_id)
+
+ @classmethod
+ async def from_python_async(cls, stream, value, type_id=None):
+ return await cls._from_python_async(stream, value, type_id)
+
+ @classmethod
+ def _write_header(cls, stream, type_id, length):
+ stream.write(length.to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
+ byteorder=PROTOCOL_BYTE_ORDER
+ ))
-class MapObject(Map):
+class MapObject(Nullable, _MapBase):
"""
This is a dictionary type.
@@ -531,61 +488,65 @@ class MapObject(Map):
default = {}
@classmethod
- def build_header(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('length', ctypes.c_int),
- ('type', ctypes.c_byte),
- ],
- }
- )
+ def parse_not_null(cls, stream):
+ return cls._parse(stream)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- obj_type = getattr(ctype_object, "type", None)
- if obj_type:
- return obj_type, super().to_python(ctype_object, *args, **kwargs)
- return None
+ async def parse_not_null_async(cls, stream):
+ return await cls._parse_async(stream)
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- obj_type = getattr(ctype_object, "type", None)
- if obj_type:
- return obj_type, await super().to_python_async(ctype_object, *args, **kwargs)
- return None
+ def _parse_header(cls, stream):
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(int_sz + 2 * b_sz, SEEK_CUR)
+ fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)]
+ return fields, length
@classmethod
- def __get_obj_type(cls, ctype_object):
- return getattr(ctype_object, "type", None)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs)
@classmethod
- def from_python(cls, stream, value, **kwargs):
- type_id, value = cls.__unpack_value(stream, value)
- if value:
- super().from_python(stream, value, type_id)
+ async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs)
@classmethod
- async def from_python_async(cls, stream, value, **kwargs):
- type_id, value = cls.__unpack_value(stream, value)
- if value:
- await super().from_python_async(stream, value, type_id)
+ def from_python_not_null(cls, stream, value, **kwargs):
+ type_id, value = value
+ if value is None:
+ Null.from_python(stream)
+ else:
+ cls._from_python(stream, value, type_id)
@classmethod
- def __unpack_value(cls, stream, value):
+ async def from_python_not_null_async(cls, stream, value, **kwargs):
+ type_id, value = value
if value is None:
Null.from_python(stream)
- return None, None
+ else:
+ await cls._from_python_async(stream, value, type_id)
- return value
+ @classmethod
+ def _write_header(cls, stream, type_id, length):
+ stream.write(cls.type_code)
+ stream.write(length.to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
+ byteorder=PROTOCOL_BYTE_ORDER)
+ )
+ stream.write(type_id.to_bytes(
+ length=ctypes.sizeof(ctypes.c_byte),
+ byteorder=PROTOCOL_BYTE_ORDER,
+ signed=True)
+ )
class BinaryObject(Nullable):
_type_id = TYPE_BINARY_OBJ
+ _header_class = None
type_code = TC_COMPLEX_OBJECT
USER_TYPE = 0x0001
@@ -615,24 +576,26 @@ class BinaryObject(Nullable):
return value._hashcode
@classmethod
- def build_header(cls):
- return type(
- cls.__name__,
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('version', ctypes.c_byte),
- ('flags', ctypes.c_short),
- ('type_id', ctypes.c_int),
- ('hash_code', ctypes.c_int),
- ('length', ctypes.c_int),
- ('schema_id', ctypes.c_int),
- ('schema_offset', ctypes.c_int),
- ],
- }
- )
+ def get_header_class(cls):
+ if not cls._header_class:
+ cls._header_class = type(
+ cls.__name__,
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('type_code', ctypes.c_byte),
+ ('version', ctypes.c_byte),
+ ('flags', ctypes.c_short),
+ ('type_id', ctypes.c_int),
+ ('hash_code', ctypes.c_int),
+ ('length', ctypes.c_int),
+ ('schema_id', ctypes.c_int),
+ ('schema_offset', ctypes.c_int),
+ ],
+ }
+ )
+ return cls._header_class
@classmethod
def offset_c_type(cls, flags: int):
@@ -686,7 +649,7 @@ class BinaryObject(Nullable):
@classmethod
def __parse_header(cls, stream):
- header_class = cls.build_header()
+ header_class = cls.get_header_class()
header = stream.read_ctype(header_class)
stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
return header, header_class
@@ -717,51 +680,51 @@ class BinaryObject(Nullable):
return final_class
@classmethod
- def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
- type_id = cls.__get_type_id(ctype_object, client)
- if type_id:
- data_class = client.query_binary_type(type_id, ctype_object.schema_id)
-
- result = data_class()
- result.version = ctype_object.version
- for field_name, field_type in data_class.schema.items():
- setattr(
- result, field_name, field_type.to_python(
- getattr(ctype_object.object_fields, field_name),
- client, *args, **kwargs
- )
- )
- return result
+ def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs):
+ type_id = ctypes_object.type_id
+ if not client:
+ raise ParseError(f'Can not query binary type {type_id}')
- return None
+ data_class = client.query_binary_type(type_id, ctypes_object.schema_id)
+ result = data_class()
+ result.version = ctypes_object.version
- @classmethod
- async def to_python_async(cls, ctype_object, client: 'AioClient' = None, *args, **kwargs):
- type_id = cls.__get_type_id(ctype_object, client)
- if type_id:
- data_class = await client.query_binary_type(type_id, ctype_object.schema_id)
-
- result = data_class()
- result.version = ctype_object.version
-
- field_values = await asyncio.gather(
- *[
- field_type.to_python_async(
- getattr(ctype_object.object_fields, field_name), client, *args, **kwargs
- )
- for field_name, field_type in data_class.schema.items()
- ]
+ for field_name, field_type in data_class.schema.items():
+ setattr(
+ result, field_name, field_type.to_python(
+ getattr(ctypes_object.object_fields, field_name),
+ client, *args, **kwargs
+ )
)
+ return result
- for i, field_name in enumerate(data_class.schema.keys()):
- setattr(result, field_name, field_values[i])
+ @classmethod
+ async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs):
+ type_id = ctypes_object.type_id
+ if not client:
+ raise ParseError(f'Can not query binary type {type_id}')
- return result
- return None
+ data_class = await client.query_binary_type(type_id, ctypes_object.schema_id)
+ result = data_class()
+ result.version = ctypes_object.version
+
+ field_values = await asyncio.gather(
+ *[
+ field_type.to_python_async(
+ getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs
+ )
+ for field_name, field_type in data_class.schema.items()
+ ]
+ )
+
+ for i, field_name in enumerate(data_class.schema.keys()):
+ setattr(result, field_name, field_values[i])
+
+ return result
@classmethod
- def __get_type_id(cls, ctype_object, client):
- type_id = getattr(ctype_object, "type_id", None)
+ def __get_type_id(cls, ctypes_object, client):
+ type_id = getattr(ctypes_object, "type_id", None)
if type_id:
if not client:
raise ParseError(f'Can not query binary type {type_id}')
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 55ed844..9bd1b76 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -136,15 +136,15 @@ class Conditional:
return await self.var1.parse_async(stream)
return await self.var2.parse_async(stream)
- def to_python(self, ctype_object, context, *args, **kwargs):
+ def to_python(self, ctypes_object, context, *args, **kwargs):
if self.predicate2(context):
- return self.var1.to_python(ctype_object, *args, **kwargs)
- return self.var2.to_python(ctype_object, *args, **kwargs)
+ return self.var1.to_python(ctypes_object, *args, **kwargs)
+ return self.var2.to_python(ctypes_object, *args, **kwargs)
- async def to_python_async(self, ctype_object, context, *args, **kwargs):
+ async def to_python_async(self, ctypes_object, context, *args, **kwargs):
if self.predicate2(context):
- return await self.var1.to_python_async(ctype_object, *args, **kwargs)
- return await self.var2.to_python_async(ctype_object, *args, **kwargs)
+ return await self.var1.to_python_async(ctypes_object, *args, **kwargs)
+ return await self.var2.to_python_async(ctypes_object, *args, **kwargs)
@attr.s
@@ -154,67 +154,56 @@ class StructArray:
counter_type = attr.ib(default=ctypes.c_int)
defaults = attr.ib(type=dict, default={})
- def build_header_class(self):
- return type(
- self.__class__.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', self.counter_type),
- ],
- },
- )
-
def parse(self, stream):
- fields, length = [], self.__parse_length(stream)
+ fields, length = self.__parse_header(stream)
for i in range(length):
c_type = Struct(self.following).parse(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return self.__build_final_class(fields)
+ return self.build_c_type(fields)
async def parse_async(self, stream):
- fields, length = [], self.__parse_length(stream)
+ fields, length = self.__parse_header(stream)
for i in range(length):
c_type = await Struct(self.following).parse_async(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- return self.__build_final_class(fields)
+ return self.build_c_type(fields)
- def __parse_length(self, stream):
- counter_type_len = ctypes.sizeof(self.counter_type)
+ def __parse_header(self, stream):
+ counter_sz = ctypes.sizeof(self.counter_type)
length = int.from_bytes(
- stream.slice(offset=counter_type_len),
+ stream.slice(offset=counter_sz),
byteorder=PROTOCOL_BYTE_ORDER
)
- stream.seek(counter_type_len, SEEK_CUR)
- return length
+ stream.seek(counter_sz, SEEK_CUR)
+ return [('length', self.counter_type)], length
- def __build_final_class(self, fields):
+ @staticmethod
+ def build_c_type(fields):
return type(
'StructArray',
- (self.build_header_class(),),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': fields,
},
)
- def to_python(self, ctype_object, *args, **kwargs):
- length = getattr(ctype_object, 'length', 0)
+ def to_python(self, ctypes_object, *args, **kwargs):
+ length = getattr(ctypes_object, 'length', 0)
return [
- Struct(self.following, dict_type=dict).to_python(getattr(ctype_object, 'element_{}'.format(i)),
+ Struct(self.following, dict_type=dict).to_python(getattr(ctypes_object, f'element_{i}'),
*args, **kwargs)
for i in range(length)
]
- async def to_python_async(self, ctype_object, *args, **kwargs):
- length = getattr(ctype_object, 'length', 0)
+ async def to_python_async(self, ctypes_object, *args, **kwargs):
+ length = getattr(ctypes_object, 'length', 0)
result_coro = [
- Struct(self.following, dict_type=dict).to_python_async(getattr(ctype_object, 'element_{}'.format(i)),
+ Struct(self.following, dict_type=dict).to_python_async(getattr(ctypes_object, f'element_{i}'),
*args, **kwargs)
for i in range(length)
]
@@ -239,10 +228,10 @@ class StructArray:
await el_class.from_python_async(stream, v[name])
def __write_header(self, stream, length):
- header_class = self.build_header_class()
- header = header_class()
- header.length = length
- stream.write(header)
+ stream.write(
+ length.to_bytes(ctypes.sizeof(self.counter_type),
+ byteorder=PROTOCOL_BYTE_ORDER)
+ )
@attr.s
@@ -262,7 +251,7 @@ class Struct:
if name in ctx:
ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
- return self.__build_final_class(fields)
+ return self.build_c_type(fields)
async def parse_async(self, stream):
fields, ctx = [], self.__prepare_conditional_ctx()
@@ -274,7 +263,7 @@ class Struct:
if name in ctx:
ctx[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
- return self.__build_final_class(fields)
+ return self.build_c_type(fields)
def __prepare_conditional_ctx(self):
ctx = {}
@@ -285,7 +274,7 @@ class Struct:
return ctx
@staticmethod
- def __build_final_class(fields):
+ def build_c_type(fields):
return type(
'Struct',
(ctypes.LittleEndianStructure,),
@@ -295,34 +284,34 @@ class Struct:
},
)
- def to_python(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]:
+ def to_python(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]:
result = self.dict_type()
for name, c_type in self.fields:
is_cond = isinstance(c_type, Conditional)
result[name] = c_type.to_python(
- getattr(ctype_object, name),
+ getattr(ctypes_object, name),
result,
*args, **kwargs
) if is_cond else c_type.to_python(
- getattr(ctype_object, name),
+ getattr(ctypes_object, name),
*args, **kwargs
)
return result
- async def to_python_async(self, ctype_object, *args, **kwargs) -> Union[dict, OrderedDict]:
+ async def to_python_async(self, ctypes_object, *args, **kwargs) -> Union[dict, OrderedDict]:
result = self.dict_type()
for name, c_type in self.fields:
is_cond = isinstance(c_type, Conditional)
if is_cond:
value = await c_type.to_python_async(
- getattr(ctype_object, name),
+ getattr(ctypes_object, name),
result,
*args, **kwargs
)
else:
value = await c_type.to_python_async(
- getattr(ctype_object, name),
+ getattr(ctypes_object, name),
*args, **kwargs
)
result[name] = value
@@ -405,18 +394,18 @@ class AnyDataObject:
raise ParseError('Unknown type code: `{}`'.format(type_code))
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- data_class = cls.__data_class_from_ctype(ctype_object)
- return data_class.to_python(ctype_object)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ data_class = cls.__data_class_from_ctype(ctypes_object)
+ return data_class.to_python(ctypes_object)
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- data_class = cls.__data_class_from_ctype(ctype_object)
- return await data_class.to_python_async(ctype_object)
+ async def to_python_async(cls, ctypes_object, *args, **kwargs):
+ data_class = cls.__data_class_from_ctype(ctypes_object)
+ return await data_class.to_python_async(ctypes_object)
@classmethod
- def __data_class_from_ctype(cls, ctype_object):
- type_code = ctype_object.type_code.to_bytes(
+ def __data_class_from_ctype(cls, ctypes_object):
+ type_code = ctypes_object.type_code.to_bytes(
ctypes.sizeof(ctypes.c_byte),
byteorder=PROTOCOL_BYTE_ORDER
)
@@ -440,7 +429,7 @@ class AnyDataObject:
int: LongObject,
float: DoubleObject,
str: String,
- bytes: String,
+ bytes: ByteArrayObject,
bytearray: ByteArrayObject,
bool: BoolObject,
type(None): Null,
@@ -455,7 +444,6 @@ class AnyDataObject:
int: LongArrayObject,
float: DoubleArrayObject,
str: StringArrayObject,
- bytes: StringArrayObject,
bool: BoolArrayObject,
uuid.UUID: UUIDArrayObject,
datetime: DateArrayObject,
@@ -558,48 +546,33 @@ class AnyDataArray(AnyDataObject):
"""
counter_type = attr.ib(default=ctypes.c_int)
- def build_header(self):
- return type(
- self.__class__.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', self.counter_type),
- ],
- }
- )
-
def parse(self, stream):
- header, header_class = self.__parse_header(stream)
-
- fields = []
- for i in range(header.length):
+ fields, length = self.__parse_header(stream)
+ for i in range(length):
c_type = super().parse(stream)
- fields.append(('element_{}'.format(i), c_type))
-
- return self.__build_final_class(header_class, fields)
+ fields.append((f'element_{i}', c_type))
+ return self.build_c_type(fields)
async def parse_async(self, stream):
- header, header_class = self.__parse_header(stream)
-
- fields = []
- for i in range(header.length):
+ fields, length = self.__parse_header(stream)
+ for i in range(length):
c_type = await super().parse_async(stream)
- fields.append(('element_{}'.format(i), c_type))
-
- return self.__build_final_class(header_class, fields)
+ fields.append((f'element_{i}', c_type))
+ return self.build_c_type(fields)
def __parse_header(self, stream):
- header_class = self.build_header()
- header = stream.read_ctype(header_class)
- stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
- return header, header_class
+ cnt_sz = ctypes.sizeof(self.counter_type)
+ length = int.from_bytes(
+ stream.slice(stream.tell(), cnt_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(cnt_sz, SEEK_CUR)
+ return [('length', self.counter_type)], length
- def __build_final_class(self, header_class, fields):
+ def build_c_type(self, fields):
return type(
self.__class__.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': fields,
@@ -607,56 +580,50 @@ class AnyDataArray(AnyDataObject):
)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- length = cls.__get_length(ctype_object)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ length = getattr(ctypes_object, "length", 0)
return [
- super().to_python(getattr(ctype_object, 'element_{}'.format(i)), *args, **kwargs)
+ super().to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
for i in range(length)
]
@classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- length = cls.__get_length(ctype_object)
+ async def to_python_async(cls, ctypes_object, *args, **kwargs):
+ length = getattr(ctypes_object, "length", 0)
values = asyncio.gather(
*[
super().to_python(
- getattr(ctype_object, 'element_{}'.format(i)),
+ getattr(ctypes_object, f'element_{i}'),
*args, **kwargs
) for i in range(length)
]
)
return await values
- @staticmethod
- def __get_length(ctype_object):
- return getattr(ctype_object, "length", None)
-
def from_python(self, stream, value):
- try:
- length = len(value)
- except TypeError:
- value = [value]
- length = 1
- self.__write_header(stream, length)
+ value = self.__write_header_and_process_value(stream, value)
for x in value:
infer_from_python(stream, x)
async def from_python_async(self, stream, value):
+ value = self.__write_header_and_process_value(stream, value)
+
+ for x in value:
+ await infer_from_python_async(stream, x)
+
+ def __write_header_and_process_value(self, stream, value):
try:
length = len(value)
except TypeError:
value = [value]
length = 1
- self.__write_header(stream, length)
- for x in value:
- await infer_from_python_async(stream, x)
+ stream.write(length.to_bytes(
+ ctypes.sizeof(self.counter_type),
+ byteorder=PROTOCOL_BYTE_ORDER
+ ))
- def __write_header(self, stream, length):
- header_class = self.build_header()
- header = header_class()
- header.length = length
- stream.write(header)
+ return value
diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py
index 3bbb196..037f680 100644
--- a/pyignite/datatypes/primitive.py
+++ b/pyignite/datatypes/primitive.py
@@ -52,8 +52,8 @@ class Primitive(IgniteDataType):
return cls.c_type
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return ctype_object
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object
class Byte(Primitive):
@@ -122,8 +122,8 @@ class Char(Primitive):
c_type = ctypes.c_short
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return ctype_object.value.to_bytes(
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.value.to_bytes(
ctypes.sizeof(cls.c_type),
byteorder=PROTOCOL_BYTE_ORDER
).decode(PROTOCOL_CHAR_ENCODING)
@@ -147,9 +147,9 @@ class Bool(Primitive):
c_type = ctypes.c_byte # Use c_byte because c_bool throws endianness conversion error on BE systems.
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return ctype_object != 0
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object != 0
@classmethod
- def from_python(cls, stream, value):
+ def from_python(cls, stream, value, **kwargs):
stream.write(struct.pack("<b", 1 if value else 0))
diff --git a/pyignite/datatypes/primitive_arrays.py b/pyignite/datatypes/primitive_arrays.py
index a21de77..e1d4289 100644
--- a/pyignite/datatypes/primitive_arrays.py
+++ b/pyignite/datatypes/primitive_arrays.py
@@ -17,6 +17,7 @@ import ctypes
from io import SEEK_CUR
from pyignite.constants import *
+from .base import IgniteDataType
from .null_object import Nullable
from .primitive import *
from .type_codes import *
@@ -32,70 +33,50 @@ __all__ = [
]
-class PrimitiveArray(Nullable):
+class PrimitiveArray(IgniteDataType):
"""
Base class for array of primitives. Payload-only.
"""
_type_name = None
_type_id = None
primitive_type = None
- type_code = None
@classmethod
- def build_header_class(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ],
- }
+ def build_c_type(cls, stream):
+ length = int.from_bytes(
+ stream.slice(stream.tell(), ctypes.sizeof(ctypes.c_int)),
+ byteorder=PROTOCOL_BYTE_ORDER
)
- @classmethod
- def parse_not_null(cls, stream):
- header_class = cls.build_header_class()
- header = stream.read_ctype(header_class)
-
- final_class = type(
+ return type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure, ),
{
'_pack_': 1,
'_fields_': [
- ('data', cls.primitive_type.c_type * header.length),
+ ('length', ctypes.c_int),
+ ('data', cls.primitive_type.c_type * length),
],
}
)
- stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
- return final_class
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- length = getattr(ctype_object, "length", None)
- if length is None:
- return None
- return [ctype_object.data[i] for i in range(ctype_object.length)]
+ def parse(cls, stream):
+ c_type = cls.build_c_type(stream)
+ stream.seek(ctypes.sizeof(c_type), SEEK_CUR)
+ return c_type
@classmethod
- async def to_python_async(cls, ctypes_object, *args, **kwargs):
- return cls.to_python(ctypes_object, *args, **kwargs)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return [ctypes_object.data[i] for i in range(ctypes_object.length)]
@classmethod
- def from_python_not_null(cls, stream, value, **kwargs):
- header_class = cls.build_header_class()
- header = header_class()
- if hasattr(header, 'type_code'):
- header.type_code = int.from_bytes(
- cls.type_code,
- byteorder=PROTOCOL_BYTE_ORDER
- )
- length = len(value)
- header.length = length
+ def _write_header(cls, stream, value):
+ stream.write(len(value).to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
- stream.write(header)
+ @classmethod
+ def from_python(cls, stream, value, **kwargs):
+ cls._write_header(stream, value)
for x in value:
cls.primitive_type.from_python(stream, x)
@@ -107,19 +88,12 @@ class ByteArray(PrimitiveArray):
type_code = TC_BYTE_ARRAY
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- data = getattr(ctype_object, "data", None)
- if data is None:
- return None
- return bytearray(data)
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return bytes(ctypes_object.data)
@classmethod
- def from_python(cls, stream, value):
- header_class = cls.build_header_class()
- header = header_class()
- header.length = len(value)
-
- stream.write(header)
+ def from_python(cls, stream, value, **kwargs):
+ cls._write_header(stream, value)
stream.write(bytearray(value))
@@ -172,29 +146,58 @@ class BoolArray(PrimitiveArray):
type_code = TC_BOOL_ARRAY
-class PrimitiveArrayObject(PrimitiveArray):
+class PrimitiveArrayObject(Nullable):
"""
Base class for primitive array object. Type code plus payload.
"""
_type_name = None
_type_id = None
+ primitive_type = None
+ type_code = None
pythonic = list
default = []
@classmethod
- def build_header_class(cls):
+ def build_c_type(cls, stream):
+ length = int.from_bytes(
+ stream.slice(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+
return type(
- cls.__name__ + 'Header',
+ cls.__name__,
(ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': [
('type_code', ctypes.c_byte),
('length', ctypes.c_int),
+ ('data', cls.primitive_type.c_type * length),
],
}
)
+ @classmethod
+ def parse_not_null(cls, stream):
+ c_type = cls.build_c_type(stream)
+ stream.seek(ctypes.sizeof(c_type), SEEK_CUR)
+ return c_type
+
+ @classmethod
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return [ctypes_object.data[i] for i in range(ctypes_object.length)]
+
+ @classmethod
+ def from_python_not_null(cls, stream, value, **kwargs):
+ cls._write_header(stream, value)
+ for x in value:
+ cls.primitive_type.from_python(stream, x)
+
+ @classmethod
+ def _write_header(cls, stream, value):
+ stream.write(cls.type_code)
+ stream.write(len(value).to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER))
+
class ByteArrayObject(PrimitiveArrayObject):
_type_name = NAME_BYTE_ARR
@@ -203,19 +206,12 @@ class ByteArrayObject(PrimitiveArrayObject):
type_code = TC_BYTE_ARRAY
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return ByteArray.to_python(ctype_object, *args, **kwargs)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return bytes(ctypes_object.data)
@classmethod
- def from_python_not_null(cls, stream, value):
- header_class = cls.build_header_class()
- header = header_class()
- header.type_code = int.from_bytes(
- cls.type_code,
- byteorder=PROTOCOL_BYTE_ORDER
- )
- header.length = len(value)
- stream.write(header)
+ def from_python_not_null(cls, stream, value, **kwargs):
+ cls._write_header(stream, value)
if isinstance(value, (bytes, bytearray)):
stream.write(value)
@@ -281,10 +277,8 @@ class CharArrayObject(PrimitiveArrayObject):
type_code = TC_CHAR_ARRAY
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- values = super().to_python(ctype_object, *args, **kwargs)
- if values is None:
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ values = super().to_python_not_null(ctypes_object, *args, **kwargs)
return [
v.to_bytes(
ctypes.sizeof(cls.primitive_type.c_type),
@@ -302,11 +296,5 @@ class BoolArrayObject(PrimitiveArrayObject):
type_code = TC_BOOL_ARRAY
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- if not ctype_object:
- return None
- length = getattr(ctype_object, "length", None)
- if length is None:
- return None
-
- return [ctype_object.data[i] != 0 for i in range(length)]
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return [ctypes_object.data[i] != 0 for i in range(ctypes_object.length)]
diff --git a/pyignite/datatypes/primitive_objects.py b/pyignite/datatypes/primitive_objects.py
index 5849935..9b23ec9 100644
--- a/pyignite/datatypes/primitive_objects.py
+++ b/pyignite/datatypes/primitive_objects.py
@@ -65,12 +65,8 @@ class DataObject(Nullable):
return data_type
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- return getattr(ctype_object, "value", None)
-
- @classmethod
- async def to_python_async(cls, ctype_object, *args, **kwargs):
- return cls.to_python(ctype_object, *args, **kwargs)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.value
@classmethod
def from_python_not_null(cls, stream, value, **kwargs):
@@ -188,10 +184,8 @@ class CharObject(DataObject):
return ord(value)
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- value = getattr(ctype_object, "value", None)
- if value is None:
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ value = ctypes_object.value
return value.to_bytes(
ctypes.sizeof(cls.c_type),
byteorder=PROTOCOL_BYTE_ORDER
@@ -224,8 +218,5 @@ class BoolObject(DataObject):
return 1231 if value else 1237
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- value = getattr(ctype_object, "value", None)
- if value is None:
- return None
- return value != 0
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.value != 0
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 4ca6795..5657afb 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -23,6 +23,7 @@ import uuid
from pyignite.constants import *
from pyignite.utils import datetime_hashcode, decimal_hashcode, hashcode
+from .base import IgniteDataType
from .type_codes import *
from .type_ids import *
from .type_names import *
@@ -100,14 +101,14 @@ class String(Nullable):
return data_type
@classmethod
- def to_python_not_null(cls, ctype_object, *args, **kwargs):
- if ctype_object.length > 0:
- return ctype_object.data.decode(PROTOCOL_STRING_ENCODING)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ if ctypes_object.length > 0:
+ return ctypes_object.data.decode(PROTOCOL_STRING_ENCODING)
return ''
@classmethod
- def from_python_not_null(cls, stream, value):
+ def from_python_not_null(cls, stream, value, **kwargs):
if isinstance(value, str):
value = value.encode(PROTOCOL_STRING_ENCODING)
length = len(value)
@@ -135,7 +136,7 @@ class DecimalObject(Nullable):
return decimal_hashcode(value)
@classmethod
- def build_c_header(cls):
+ def build_c_type(cls, length):
return type(
cls.__name__,
(ctypes.LittleEndianStructure,),
@@ -145,48 +146,41 @@ class DecimalObject(Nullable):
('type_code', ctypes.c_byte),
('scale', ctypes.c_int),
('length', ctypes.c_int),
- ],
+ ('data', ctypes.c_ubyte * length)
+ ]
}
)
@classmethod
def parse_not_null(cls, stream):
- header_class = cls.build_c_header()
- header = stream.read_ctype(header_class)
-
- data_type = type(
- cls.__name__,
- (header_class,),
- {
- '_pack_': 1,
- '_fields_': [
- ('data', ctypes.c_ubyte * header.length),
- ],
- }
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + int_sz + b_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
)
-
+ data_type = cls.build_c_type(length)
stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
return data_type
@classmethod
- def to_python_not_null(cls, ctype_object, *args, **kwargs):
- sign = 1 if ctype_object.data[0] & 0x80 else 0
- data = ctype_object.data[1:]
- data.insert(0, ctype_object.data[0] & 0x7f)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ sign = 1 if ctypes_object.data[0] & 0x80 else 0
+ data = ctypes_object.data[1:]
+ data.insert(0, ctypes_object.data[0] & 0x7f)
# decode n-byte integer
result = sum([
[x for x in reversed(data)][i] * 0x100 ** i for i in
range(len(data))
])
# apply scale
- result = result / decimal.Decimal('10') ** decimal.Decimal(ctype_object.scale)
+ result = result / decimal.Decimal('10') ** decimal.Decimal(ctypes_object.scale)
if sign:
# apply sign
result = -result
return result
@classmethod
- def from_python_not_null(cls, stream, value: decimal.Decimal):
+ def from_python_not_null(cls, stream, value: decimal.Decimal, **kwargs):
sign, digits, scale = value.normalize().as_tuple()
integer = int(''.join([str(d) for d in digits]))
# calculate number of bytes (at least one, and not forget the sign bit)
@@ -202,17 +196,7 @@ class DecimalObject(Nullable):
data[0] |= 0x80
else:
data[0] &= 0x7f
- header_class = cls.build_c_header()
- data_class = type(
- cls.__name__,
- (header_class,),
- {
- '_pack_': 1,
- '_fields_': [
- ('data', ctypes.c_ubyte * length),
- ],
- }
- )
+ data_class = cls.build_c_type(length)
data_object = data_class()
data_object.type_code = int.from_bytes(
cls.type_code,
@@ -266,7 +250,7 @@ class UUIDObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python_not_null(cls, stream, value: uuid.UUID):
+ def from_python_not_null(cls, stream, value: uuid.UUID, **kwargs):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -381,7 +365,7 @@ class DateObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python_not_null(cls, stream, value: [date, datetime]):
+ def from_python_not_null(cls, stream, value: [date, datetime], **kwargs):
if type(value) is date:
value = datetime.combine(value, time())
data_type = cls.build_c_type()
@@ -433,7 +417,7 @@ class TimeObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python_not_null(cls, stream, value: timedelta):
+ def from_python_not_null(cls, stream, value: timedelta, **kwargs):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -480,7 +464,7 @@ class EnumObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python_not_null(cls, stream, value: tuple):
+ def from_python_not_null(cls, stream, value: tuple, **kwargs):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -505,84 +489,89 @@ class BinaryEnumObject(EnumObject):
type_code = TC_BINARY_ENUM
-class StandardArray(Nullable):
- """
- Base class for array of primitives. Payload-only.
- """
- _type_name = None
- _type_id = None
+class _StandardArrayBase:
standard_type = None
- type_code = None
@classmethod
- def build_header_class(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('length', ctypes.c_int),
- ],
- }
- )
+ def _parse_header(cls, stream):
+ raise NotImplementedError
@classmethod
- def parse_not_null(cls, stream):
- header_class = cls.build_header_class()
- header = stream.read_ctype(header_class)
- stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+ def _parse(cls, stream):
+ fields, length = cls._parse_header(stream)
- fields = []
- for i in range(header.length):
+ for i in range(length):
c_type = cls.standard_type.parse(stream)
- fields.append(('element_{}'.format(i), c_type))
+ fields.append((f'element_{i}', c_type))
- final_class = type(
+ return type(
cls.__name__,
- (header_class,),
+ (ctypes.LittleEndianStructure,),
{
'_pack_': 1,
'_fields_': fields,
}
)
- return final_class
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- length = getattr(ctype_object, "length", None)
- if length is None:
- return None
+ def _write_header(cls, stream, value, **kwargs):
+ raise NotImplementedError
- result = []
- for i in range(length):
- result.append(
- cls.standard_type.to_python(
- getattr(ctype_object, 'element_{}'.format(i)),
- *args, **kwargs
- )
- )
- return result
+ @classmethod
+ def _from_python(cls, stream, value, **kwargs):
+ cls._write_header(stream, value, **kwargs)
+ for x in value:
+ cls.standard_type.from_python(stream, x)
@classmethod
- async def to_python_async(cls, ctypes_object, *args, **kwargs):
- return cls.to_python(ctypes_object, *args, **kwargs)
+ def _to_python(cls, ctypes_object, *args, **kwargs):
+ length = ctypes_object.length
+ return [
+ cls.standard_type.to_python(
+ getattr(ctypes_object, f'element_{i}'), *args, **kwargs
+ ) for i in range(length)
+ ]
+
+
+class StandardArray(IgniteDataType, _StandardArrayBase):
+ """
+ Base class for array of primitives. Payload-only.
+ """
+ _type_name = None
+ _type_id = None
+ type_code = None
@classmethod
- def from_python_not_null(cls, stream, value, **kwargs):
- header_class = cls.build_header_class()
- header = header_class()
- if hasattr(header, 'type_code'):
- header.type_code = int.from_bytes(
- cls.type_code,
+ def _parse_header(cls, stream):
+ int_sz = ctypes.sizeof(ctypes.c_int)
+ length = int.from_bytes(
+ stream.slice(stream.tell(), int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(int_sz, SEEK_CUR)
+
+ return [('length', ctypes.c_int)], length
+
+ @classmethod
+ def parse(cls, stream):
+ return cls._parse(stream)
+
+ @classmethod
+ def _write_header(cls, stream, value, **kwargs):
+ stream.write(
+ len(value).to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
byteorder=PROTOCOL_BYTE_ORDER
)
- length = len(value)
- header.length = length
+ )
- stream.write(header)
- for x in value:
- cls.standard_type.from_python(stream, x)
+ @classmethod
+ def from_python(cls, stream, value, **kwargs):
+ cls._from_python(stream, value, **kwargs)
+
+ @classmethod
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ return cls._to_python(ctypes_object, *args, **kwargs)
class StringArray(StandardArray):
@@ -633,26 +622,47 @@ class EnumArray(StandardArray):
standard_type = EnumObject
-class StandardArrayObject(StandardArray):
+class StandardArrayObject(Nullable, _StandardArrayBase):
_type_name = None
_type_id = None
+ standard_type = None
+ type_code = None
pythonic = list
default = []
@classmethod
- def build_header_class(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('length', ctypes.c_int),
- ],
- }
+ def _parse_header(cls, stream):
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(int_sz + b_sz, SEEK_CUR)
+
+ return [('type_code', ctypes.c_byte), ('length', ctypes.c_int)], length
+
+ @classmethod
+ def parse_not_null(cls, stream):
+ return cls._parse(stream)
+
+ @classmethod
+ def _write_header(cls, stream, value, **kwargs):
+ stream.write(cls.type_code)
+ stream.write(
+ len(value).to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
)
+ @classmethod
+ def from_python_not_null(cls, stream, value, **kwargs):
+ cls._from_python(stream, value, **kwargs)
+
+ @classmethod
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return cls._to_python(ctypes_object, *args, **kwargs)
+
class StringArrayObject(StandardArrayObject):
""" List of strings. """
@@ -714,45 +724,43 @@ class EnumArrayObject(StandardArrayObject):
standard_type = EnumObject
type_code = TC_ENUM_ARRAY
+ OBJECT = -1
+
@classmethod
- def build_header_class(cls):
- return type(
- cls.__name__ + 'Header',
- (ctypes.LittleEndianStructure,),
- {
- '_pack_': 1,
- '_fields_': [
- ('type_code', ctypes.c_byte),
- ('type_id', ctypes.c_int),
- ('length', ctypes.c_int),
- ],
- }
+ def _parse_header(cls, stream):
+ int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte)
+ length = int.from_bytes(
+ stream.slice(stream.tell() + b_sz + int_sz, int_sz),
+ byteorder=PROTOCOL_BYTE_ORDER
)
+ stream.seek(2 * int_sz + b_sz, SEEK_CUR)
+ return [('type_code', ctypes.c_byte), ('type_id', ctypes.c_int), ('length', ctypes.c_int)], length
@classmethod
- def from_python_not_null(cls, stream, value, **kwargs):
- type_id, value = value
- header_class = cls.build_header_class()
- header = header_class()
- if hasattr(header, 'type_code'):
- header.type_code = int.from_bytes(
- cls.type_code,
+ def _write_header(cls, stream, value, type_id=-1):
+ stream.write(cls.type_code)
+ stream.write(
+ type_id.to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
+ byteorder=PROTOCOL_BYTE_ORDER,
+ signed=True
+ )
+ )
+ stream.write(
+ len(value).to_bytes(
+ length=ctypes.sizeof(ctypes.c_int),
byteorder=PROTOCOL_BYTE_ORDER
)
- length = len(value)
- header.length = length
- header.type_id = type_id
+ )
- stream.write(header)
- for x in value:
- cls.standard_type.from_python(stream, x)
+ @classmethod
+ def from_python_not_null(cls, stream, value, **kwargs):
+ type_id, value = value
+ super().from_python_not_null(stream, value, type_id=type_id)
@classmethod
- def to_python_not_null(cls, ctype_object, *args, **kwargs):
- type_id = getattr(ctype_object, "type_id", None)
- if type_id is None:
- return None
- return type_id, super().to_python(ctype_object, *args, **kwargs)
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ return ctypes_object.type_id, cls._to_python(ctypes_object, *args, **kwargs)
class BinaryEnumArrayObject(EnumArrayObject):
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index f0338e1..c0311ec 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -128,25 +128,25 @@ class Response:
c_type = await ignite_type.parse_async(stream)
fields.append((name, c_type))
- def to_python(self, ctype_object, *args, **kwargs):
+ def to_python(self, ctypes_object, *args, **kwargs):
if not self.following:
return None
result = OrderedDict()
for name, c_type in self.following:
result[name] = c_type.to_python(
- getattr(ctype_object, name),
+ getattr(ctypes_object, name),
*args, **kwargs
)
return result
- async def to_python_async(self, ctype_object, *args, **kwargs):
+ async def to_python_async(self, ctypes_object, *args, **kwargs):
if not self.following:
return None
values = await asyncio.gather(
- *[c_type.to_python_async(getattr(ctype_object, name), *args, **kwargs) for name, c_type in self.following]
+ *[c_type.to_python_async(getattr(ctypes_object, name), *args, **kwargs) for name, c_type in self.following]
)
return OrderedDict([(name, values[i]) for i, (name, _) in enumerate(self.following)])
@@ -239,13 +239,13 @@ class SQLResponse(Response):
('more', ctypes.c_byte),
]
- def to_python(self, ctype_object, *args, **kwargs):
- if getattr(ctype_object, 'status_code', 0) == 0:
- result = self.__to_python_result_header(ctype_object, *args, **kwargs)
+ def to_python(self, ctypes_object, *args, **kwargs):
+ if getattr(ctypes_object, 'status_code', 0) == 0:
+ result = self.__to_python_result_header(ctypes_object, *args, **kwargs)
- for row_item in ctype_object.data._fields_:
+ for row_item in ctypes_object.data._fields_:
row_name = row_item[0]
- row_object = getattr(ctype_object.data, row_name)
+ row_object = getattr(ctypes_object.data, row_name)
row = []
for col_item in row_object._fields_:
col_name = col_item[0]
@@ -254,14 +254,14 @@ class SQLResponse(Response):
result['data'].append(row)
return result
- async def to_python_async(self, ctype_object, *args, **kwargs):
- if getattr(ctype_object, 'status_code', 0) == 0:
- result = self.__to_python_result_header(ctype_object, *args, **kwargs)
+ async def to_python_async(self, ctypes_object, *args, **kwargs):
+ if getattr(ctypes_object, 'status_code', 0) == 0:
+ result = self.__to_python_result_header(ctypes_object, *args, **kwargs)
data_coro = []
- for row_item in ctype_object.data._fields_:
+ for row_item in ctypes_object.data._fields_:
row_name = row_item[0]
- row_object = getattr(ctype_object.data, row_name)
+ row_object = getattr(ctypes_object.data, row_name)
row_coro = []
for col_item in row_object._fields_:
col_name = col_item[0]
@@ -274,18 +274,18 @@ class SQLResponse(Response):
return result
@staticmethod
- def __to_python_result_header(ctype_object, *args, **kwargs):
+ def __to_python_result_header(ctypes_object, *args, **kwargs):
result = {
- 'more': Bool.to_python(ctype_object.more, *args, **kwargs),
+ 'more': Bool.to_python(ctypes_object.more, *args, **kwargs),
'data': [],
}
- if hasattr(ctype_object, 'fields'):
- result['fields'] = StringArray.to_python(ctype_object.fields, *args, **kwargs)
+ if hasattr(ctypes_object, 'fields'):
+ result['fields'] = StringArray.to_python(ctypes_object.fields, *args, **kwargs)
else:
- result['field_count'] = Int.to_python(ctype_object.field_count, *args, **kwargs)
+ result['field_count'] = Int.to_python(ctypes_object.field_count, *args, **kwargs)
- if hasattr(ctype_object, 'cursor'):
- result['cursor'] = Long.to_python(ctype_object.cursor, *args, **kwargs)
+ if hasattr(ctypes_object, 'cursor'):
+ result['cursor'] = Long.to_python(ctypes_object.cursor, *args, **kwargs)
return result
@@ -328,26 +328,26 @@ class BinaryTypeResponse(Response):
return type_exists
- def to_python(self, ctype_object, *args, **kwargs):
- if getattr(ctype_object, 'status_code', 0) == 0:
+ def to_python(self, ctypes_object, *args, **kwargs):
+ if getattr(ctypes_object, 'status_code', 0) == 0:
result = {
- 'type_exists': Bool.to_python(ctype_object.type_exists)
+ 'type_exists': Bool.to_python(ctypes_object.type_exists)
}
- if hasattr(ctype_object, 'body'):
- result.update(body_struct.to_python(ctype_object.body))
+ if hasattr(ctypes_object, 'body'):
+ result.update(body_struct.to_python(ctypes_object.body))
- if hasattr(ctype_object, 'enums'):
- result['enums'] = enum_struct.to_python(ctype_object.enums)
+ if hasattr(ctypes_object, 'enums'):
+ result['enums'] = enum_struct.to_python(ctypes_object.enums)
- if hasattr(ctype_object, 'schema'):
+ if hasattr(ctypes_object, 'schema'):
result['schema'] = {
x['schema_id']: [
z['schema_field_id'] for z in x['schema_fields']
]
- for x in schema_struct.to_python(ctype_object.schema)
+ for x in schema_struct.to_python(ctypes_object.schema)
}
return result
- async def to_python_async(self, ctype_object, *args, **kwargs):
- return self.to_python(ctype_object, *args, **kwargs)
+ async def to_python_async(self, ctypes_object, *args, **kwargs):
+ return self.to_python(ctypes_object, *args, **kwargs)
diff --git a/tests/common/test_datatypes.py b/tests/common/test_datatypes.py
index c1aa19f..6771f94 100644
--- a/tests/common/test_datatypes.py
+++ b/tests/common/test_datatypes.py
@@ -50,6 +50,7 @@ put_get_data_params = [
# arrays of integers
([1, 2, 3, 5], None),
+ (b'buzz', None),
(b'buzz', ByteArrayObject),
(bytearray([7, 8, 8, 11]), None),
(bytearray([7, 8, 8, 11]), ByteArrayObject),
@@ -122,7 +123,7 @@ put_get_data_params = [
((-1, [(6001, 1), (6002, 2), (6003, 3)]), BinaryEnumArrayObject),
# object array
- ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3')]), ObjectArrayObject),
+ ((ObjectArrayObject.OBJECT, [1, 2, decimal.Decimal('3'), bytearray(b'\x10\x20')]), ObjectArrayObject),
# collection
((CollectionObject.LINKED_LIST, [1, 2, 3]), None),
@@ -153,42 +154,47 @@ async def test_put_get_data_async(async_cache, value, value_hint):
bytearray_params = [
- [1, 2, 3, 5],
- (7, 8, 13, 18),
- (-128, -1, 0, 1, 127, 255),
+ ([1, 2, 3, 5], ByteArrayObject),
+ ((7, 8, 13, 18), ByteArrayObject),
+ ((-128, -1, 0, 1, 127, 255), ByteArrayObject),
+ (b'\x01\x03\x10', None),
+ (bytearray(b'\x01\x30'), None)
]
@pytest.mark.parametrize(
- 'value',
+ 'value,type_hint',
bytearray_params
)
-def test_bytearray_from_list_or_tuple(cache, value):
+def test_bytearray_from_different_input(cache, value, type_hint):
"""
ByteArrayObject's pythonic type is `bytearray`, but it should also accept
lists or tuples as a content.
"""
-
- cache.put('my_key', value, value_hint=ByteArrayObject)
-
- assert cache.get('my_key') == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
+ cache.put('my_key', value, value_hint=type_hint)
+ __check_bytearray_from_different_input(cache.get('my_key'), value)
@pytest.mark.parametrize(
- 'value',
+ 'value,type_hint',
bytearray_params
)
@pytest.mark.asyncio
-async def test_bytearray_from_list_or_tuple_async(async_cache, value):
+async def test_bytearray_from_different_input_async(async_cache, value, type_hint):
"""
ByteArrayObject's pythonic type is `bytearray`, but it should also accept
lists or tuples as a content.
"""
-
await async_cache.put('my_key', value, value_hint=ByteArrayObject)
+ __check_bytearray_from_different_input(await async_cache.get('my_key'), value)
+
- result = await async_cache.get('my_key')
- assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
+def __check_bytearray_from_different_input(result, value):
+ if isinstance(value, (bytes, bytearray)):
+ assert isinstance(result, bytes)
+ assert value == result
+ else:
+ assert result == bytearray([unsigned(ch, ctypes.c_ubyte) for ch in value])
uuid_params = [
diff --git a/tests/common/test_key_value.py b/tests/common/test_key_value.py
index 6e6df61..b03bec2 100644
--- a/tests/common/test_key_value.py
+++ b/tests/common/test_key_value.py
@@ -422,10 +422,13 @@ async def test_put_get_collection_async(async_cache, key, hinted_value, value):
@pytest.fixture
def complex_map():
return {"test" + str(i): ((MapObject.HASH_MAP,
- {"key_1": ((1, ["value_1", 1.0]), CollectionObject),
- "key_2": ((1, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]), CollectionObject),
- "key_3": ((1, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]), CollectionObject),
- "key_4": ((1, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]), CollectionObject),
+ {"key_1": ((CollectionObject.ARR_LIST, ["value_1", 1.0]), CollectionObject),
+ "key_2": ((CollectionObject.ARR_LIST, [["value_2_1", "1.0"], ["value_2_2", "0.25"]]),
+ CollectionObject),
+ "key_3": ((CollectionObject.ARR_LIST, [["value_3_1", "1.0"], ["value_3_2", "0.25"]]),
+ CollectionObject),
+ "key_4": ((CollectionObject.ARR_LIST, [["value_4_1", "1.0"], ["value_4_2", "0.25"]]),
+ CollectionObject),
'key_5': False,
"key_6": "value_6"}), MapObject) for i in range(10000)}
diff --git a/tests/common/test_sql.py b/tests/common/test_sql.py
index 0841b7f..b947fbc 100644
--- a/tests/common/test_sql.py
+++ b/tests/common/test_sql.py
@@ -325,3 +325,131 @@ def __check_query_with_cache(client, cache_fixture):
assert test_value == received
return async_inner() if isinstance(cache, AioCache) else inner()
+
+
+VARBIN_CREATE_QUERY = 'CREATE TABLE VarbinTable(id int primary key, varbin VARBINARY)'
+VARBIN_DROP_QUERY = 'DROP TABLE VarbinTable'
+VARBIN_MERGE_QUERY = 'MERGE INTO VarbinTable(id, varbin) VALUES (?, ?)'
+VARBIN_SELECT_QUERY = 'SELECT * FROM VarbinTable'
+
+VARBIN_TEST_PARAMS = [
+ bytearray('Test message', 'UTF-8'),
+ bytes('Test message', 'UTF-8')
+]
+
+
+@pytest.fixture
+def varbin_table(client):
+ client.sql(VARBIN_CREATE_QUERY)
+ yield None
+ client.sql(VARBIN_DROP_QUERY)
+
+
+@pytest.mark.parametrize(
+ 'value', VARBIN_TEST_PARAMS
+)
+def test_sql_cache_varbinary_handling(client, varbin_table, value):
+ client.sql(VARBIN_MERGE_QUERY, query_args=(1, value))
+ with client.sql(VARBIN_SELECT_QUERY) as cursor:
+ for row in cursor:
+ assert isinstance(row[1], bytes)
+ assert row[1] == value
+ break
+
+
+@pytest.fixture
+async def varbin_table_async(async_client):
+ await async_client.sql(VARBIN_CREATE_QUERY)
+ yield None
+ await async_client.sql(VARBIN_DROP_QUERY)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+ 'value', VARBIN_TEST_PARAMS
+)
+async def test_sql_cache_varbinary_handling_async(async_client, varbin_table_async, value):
+ await async_client.sql(VARBIN_MERGE_QUERY, query_args=(1, value))
+ async with async_client.sql(VARBIN_SELECT_QUERY) as cursor:
+ async for row in cursor:
+ assert isinstance(row[1], bytes)
+ assert row[1] == value
+ break
+
+
+@pytest.fixture
+def varbin_cache_settings():
+ cache_name = 'varbin_cache'
+ table_name = f'{cache_name}_table'.upper()
+
+ yield {
+ PROP_NAME: cache_name,
+ PROP_SQL_SCHEMA: 'PUBLIC',
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ PROP_QUERY_ENTITIES: [
+ {
+ 'table_name': table_name,
+ 'key_field_name': 'ID',
+ 'value_field_name': 'VALUE',
+ 'key_type_name': 'java.lang.Long',
+ 'value_type_name': 'byte[]',
+ 'query_indexes': [],
+ 'field_name_aliases': [],
+ 'query_fields': [
+ {
+ 'name': 'ID',
+ 'type_name': 'java.lang.Long',
+ 'is_key_field': True,
+ 'is_notnull_constraint_field': True,
+ },
+ {
+ 'name': 'VALUE',
+ 'type_name': 'byte[]',
+ },
+ ],
+ },
+ ],
+ }
+
+
+VARBIN_CACHE_TABLE_NAME = 'varbin_cache_table'.upper()
+VARBIN_CACHE_SELECT_QUERY = f'SELECT * FROM {VARBIN_CACHE_TABLE_NAME}'
+
+
+@pytest.fixture
+def varbin_cache(client, varbin_cache_settings):
+ cache = client.get_or_create_cache(varbin_cache_settings)
+ yield cache
+ cache.destroy()
+
+
+@pytest.mark.parametrize(
+ 'value', VARBIN_TEST_PARAMS
+)
+def test_cache_varbinary_handling(client, varbin_cache, value):
+ varbin_cache.put(1, value)
+ with client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor:
+ for row in cursor:
+ assert isinstance(row[1], bytes)
+ assert row[1] == value
+ break
+
+
+@pytest.fixture
+async def varbin_cache_async(async_client, varbin_cache_settings):
+ cache = await async_client.get_or_create_cache(varbin_cache_settings)
+ yield cache
+ await cache.destroy()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+ 'value', VARBIN_TEST_PARAMS
+)
+async def test_cache_varbinary_handling_async(async_client, varbin_cache_async, value):
+ await varbin_cache_async.put(1, value)
+ async with async_client.sql(VARBIN_CACHE_SELECT_QUERY) as cursor:
+ async for row in cursor:
+ assert isinstance(row[1], bytes)
+ assert row[1] == value
+ break