You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/02/08 14:12:50 UTC
[ignite-python-thin-client] branch master updated: IGNITE-13967:
Optimizations and refactoring of parsing
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 2ead7b9 IGNITE-13967: Optimizations and refactoring of parsing
2ead7b9 is described below
commit 2ead7b928c82dd212273789a275b717ecafca295
Author: Ivan Dashchinskiy <iv...@gmail.com>
AuthorDate: Mon Feb 8 17:11:05 2021 +0300
IGNITE-13967: Optimizations and refactoring of parsing
This closes #10
---
.gitignore | 4 +-
pyignite/api/affinity.py | 8 +-
pyignite/api/binary.py | 70 ++++---
pyignite/binary.py | 47 +++--
pyignite/cache.py | 4 +-
pyignite/connection/__init__.py | 216 ++++++++-----------
pyignite/connection/handshake.py | 5 +-
pyignite/datatypes/__init__.py | 19 ++
pyignite/datatypes/cache_properties.py | 18 +-
pyignite/datatypes/complex.py | 229 ++++++---------------
pyignite/datatypes/internal.py | 99 +++++----
pyignite/datatypes/null_object.py | 55 ++++-
pyignite/datatypes/primitive.py | 43 ++--
pyignite/datatypes/primitive_arrays.py | 61 +++---
pyignite/datatypes/primitive_objects.py | 32 ++-
pyignite/datatypes/standard.py | 210 +++++++------------
pyignite/queries/query.py | 40 ++--
pyignite/queries/response.py | 49 +++--
pyignite/{datatypes => stream}/__init__.py | 13 +-
pyignite/stream/binary_stream.py | 111 ++++++++++
pyignite/utils.py | 26 +--
requirements/tests.txt | 1 +
tests/config/ignite-config-ssl.xml | 51 -----
tests/config/ignite-config.xml | 39 ----
...te-config-base.xml => ignite-config.xml.jinja2} | 39 +++-
tests/config/{log4j.xml => log4j.xml.jinja2} | 4 +-
tests/conftest.py | 8 +-
tests/test_affinity_request_routing.py | 92 ++++++---
tests/test_affinity_single_connection.py | 4 -
tests/test_cache_composite_key_class_sql.py | 5 +-
tests/test_sql.py | 4 +-
tests/util.py | 71 +++----
tox.ini | 19 ++
33 files changed, 787 insertions(+), 909 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7372921..d28510c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,8 @@
.eggs
.pytest_cache
.tox
+tests/config/*.xml
+junit*.xml
pyignite.egg-info
ignite-log-*
-__pycache__
\ No newline at end of file
+__pycache__
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index d28cfb8..16148a1 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -55,12 +55,12 @@ empty_node_mapping = Struct([])
partition_mapping = StructArray([
('is_applicable', Bool),
- ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
- lambda ctx: ctx['is_applicable'] is True,
+ ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+ lambda ctx: ctx['is_applicable'],
cache_mapping, empty_cache_mapping)),
- ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
- lambda ctx: ctx['is_applicable'] is True,
+ ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+ lambda ctx: ctx['is_applicable'],
node_mapping, empty_node_mapping)),
])
diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py
index 722001a..0e63c17 100644
--- a/pyignite/api/binary.py
+++ b/pyignite/api/binary.py
@@ -24,16 +24,15 @@ from pyignite.queries import Query
from pyignite.queries.op_codes import *
from pyignite.utils import int_overflow, entity_id
from .result import APIResult
+from ..stream import BinaryStream, READ_BACKWARD
from ..queries.response import Response
-def get_binary_type(
- connection: 'Connection', binary_type: Union[str, int], query_id=None,
-) -> APIResult:
+def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
"""
Gets the binary type information by type ID.
- :param connection: connection to Ignite server,
+ :param conn: connection to Ignite server,
:param binary_type: binary type name or ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
@@ -49,39 +48,42 @@ def get_binary_type(
query_id=query_id,
)
- _, send_buffer = query_struct.from_python({
- 'type_id': entity_id(binary_type),
- })
- connection.send(send_buffer)
+ with BinaryStream(conn) as stream:
+ query_struct.from_python(stream, {
+ 'type_id': entity_id(binary_type),
+ })
+ conn.send(stream.getbuffer())
- response_head_struct = Response(protocol_version=connection.get_protocol_version(),
+ response_head_struct = Response(protocol_version=conn.get_protocol_version(),
following=[('type_exists', Bool)])
- response_head_type, recv_buffer = response_head_struct.parse(connection)
- response_head = response_head_type.from_buffer_copy(recv_buffer)
- response_parts = []
- if response_head.type_exists:
- resp_body_type, resp_body_buffer = body_struct.parse(connection)
- response_parts.append(('body', resp_body_type))
- resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
- recv_buffer += resp_body_buffer
- if resp_body.is_enum:
- resp_enum, resp_enum_buffer = enum_struct.parse(connection)
- response_parts.append(('enums', resp_enum))
- recv_buffer += resp_enum_buffer
- resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)
- response_parts.append(('schema', resp_schema_type))
- recv_buffer += resp_schema_buffer
-
- response_class = type(
- 'GetBinaryTypeResponse',
- (response_head_type,),
- {
- '_pack_': 1,
- '_fields_': response_parts,
- }
- )
- response = response_class.from_buffer_copy(recv_buffer)
+ with BinaryStream(conn, conn.recv()) as stream:
+ init_pos = stream.tell()
+ response_head_type = response_head_struct.parse(stream)
+ response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD)
+
+ response_parts = []
+ if response_head.type_exists:
+ resp_body_type = body_struct.parse(stream)
+ response_parts.append(('body', resp_body_type))
+ resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD)
+ if resp_body.is_enum:
+ resp_enum = enum_struct.parse(stream)
+ response_parts.append(('enums', resp_enum))
+
+ resp_schema_type = schema_struct.parse(stream)
+ response_parts.append(('schema', resp_schema_type))
+
+ response_class = type(
+ 'GetBinaryTypeResponse',
+ (response_head_type,),
+ {
+ '_pack_': 1,
+ '_fields_': response_parts,
+ }
+ )
+ response = stream.read_ctype(response_class, position=init_pos)
+
result = APIResult(response)
if result.status != 0:
return result
diff --git a/pyignite/binary.py b/pyignite/binary.py
index 5d76c1b..da62bb5 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -102,18 +102,17 @@ class GenericObjectMeta(GenericObjectPropsMeta):
mcs, name, (GenericObjectProps, )+base_classes, namespace
)
- def _build(self, client: 'Client' = None) -> int:
+ def _from_python(self, stream, save_to_buf=False):
"""
Method for building binary representation of the Generic object
and calculating a hashcode from it.
:param self: Generic object instance,
- :param client: (optional) connection to Ignite cluster,
+ :param stream: BinaryStream
+ :param save_to_buf: Optional. If True, save serialized data to buffer.
"""
- if client is None:
- compact_footer = True
- else:
- compact_footer = client.compact_footer
+
+ compact_footer = stream.compact_footer
# prepare header
header_class = BinaryObject.build_header()
@@ -129,18 +128,19 @@ class GenericObjectMeta(GenericObjectPropsMeta):
header.type_id = self.type_id
header.schema_id = self.schema_id
+ header_len = ctypes.sizeof(header_class)
+ initial_pos = stream.tell()
+
# create fields and calculate offsets
offsets = [ctypes.sizeof(header_class)]
- field_buffer = bytearray()
schema_items = list(self.schema.items())
+
+ stream.seek(initial_pos + header_len)
for field_name, field_type in schema_items:
- partial_buffer = field_type.from_python(
- getattr(
- self, field_name, getattr(field_type, 'default', None)
- )
- )
- offsets.append(max(offsets) + len(partial_buffer))
- field_buffer += partial_buffer
+ val = getattr(self, field_name, getattr(field_type, 'default', None))
+ field_start_pos = stream.tell()
+ field_type.from_python(stream, val)
+ offsets.append(max(offsets) + stream.tell() - field_start_pos)
offsets = offsets[:-1]
@@ -160,15 +160,18 @@ class GenericObjectMeta(GenericObjectPropsMeta):
schema[i].offset = offset
# calculate size and hash code
- header.schema_offset = (
- ctypes.sizeof(header_class)
- + len(field_buffer)
- )
+ fields_data_len = stream.tell() - initial_pos - header_len
+ header.schema_offset = fields_data_len + header_len
header.length = header.schema_offset + ctypes.sizeof(schema_class)
- header.hash_code = hashcode(field_buffer)
+ header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len)
+
+ stream.seek(initial_pos)
+ stream.write(header)
+ stream.seek(initial_pos + header.schema_offset)
+ stream.write(schema)
- # reuse the results
- self._buffer = bytes(header) + field_buffer + bytes(schema)
+ if save_to_buf:
+ self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
self._hashcode = header.hash_code
def _setattr(self, attr_name: str, attr_value: Any):
@@ -180,7 +183,7 @@ class GenericObjectMeta(GenericObjectPropsMeta):
# `super()` is really need these parameters
super(result, self).__setattr__(attr_name, attr_value)
- setattr(result, _build.__name__, _build)
+ setattr(result, _from_python.__name__, _from_python)
setattr(result, '__setattr__', _setattr)
setattr(result, '_buffer', None)
setattr(result, '_hashcode', None)
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 64093e8..dd7dac4 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -17,7 +17,7 @@ import time
from typing import Any, Dict, Iterable, Optional, Tuple, Union
from .constants import *
-from .binary import GenericObjectMeta
+from .binary import GenericObjectMeta, unwrap_binary
from .datatypes import prop_codes
from .datatypes.internal import AnyDataObject
from .exceptions import (
@@ -26,7 +26,7 @@ from .exceptions import (
)
from .utils import (
cache_id, get_field_by_id, is_wrapped,
- status_to_exception, unsigned, unwrap_binary,
+ status_to_exception, unsigned
)
from .api.cache_config import (
cache_create, cache_create_with_config,
diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py
index cf40718..0e793f8 100644
--- a/pyignite/connection/__init__.py
+++ b/pyignite/connection/__init__.py
@@ -35,7 +35,7 @@ as well as Ignite protocol handshaking.
from collections import OrderedDict
import socket
-from threading import Lock
+from threading import RLock
from typing import Union
from pyignite.constants import *
@@ -52,6 +52,8 @@ from .ssl import wrap
__all__ = ['Connection']
+from ..stream import BinaryStream, READ_BACKWARD
+
class Connection:
"""
@@ -60,8 +62,7 @@ class Connection:
* socket wrapper. Detects fragmentation and network errors. See also
https://docs.python.org/3/howto/sockets.html,
- * binary protocol connector. Incapsulates handshake, data read-ahead and
- failover reconnection.
+ * binary protocol connector. Incapsulates handshake and failover reconnection.
"""
_socket = None
@@ -72,7 +73,6 @@ class Connection:
host = None
port = None
timeout = None
- prefetch = None
username = None
password = None
ssl_params = {}
@@ -97,7 +97,7 @@ class Connection:
).format(param))
def __init__(
- self, client: 'Client', prefetch: bytes = b'', timeout: int = None,
+ self, client: 'Client', timeout: float = 2.0,
username: str = None, password: str = None, **ssl_params
):
"""
@@ -107,8 +107,6 @@ class Connection:
https://docs.python.org/3/library/ssl.html#ssl-certificates.
:param client: Ignite client object,
- :param prefetch: (optional) initialize the read-ahead data buffer.
- Empty by default,
:param timeout: (optional) sets timeout (in seconds) for each socket
operation including `connect`. 0 means non-blocking mode, which is
virtually guaranteed to fail. Can accept integer or float value.
@@ -143,7 +141,6 @@ class Connection:
:param password: (optional) password to authenticate to Ignite cluster.
"""
self.client = client
- self.prefetch = prefetch
self.timeout = timeout
self.username = username
self.password = password
@@ -152,7 +149,8 @@ class Connection:
ssl_params['use_ssl'] = True
self.ssl_params = ssl_params
self._failed = False
- self._in_use = Lock()
+ self._mux = RLock()
+ self._in_use = False
@property
def socket(self) -> socket.socket:
@@ -162,17 +160,20 @@ class Connection:
@property
def closed(self) -> bool:
""" Tells if socket is closed. """
- return self._socket is None
+ with self._mux:
+ return self._socket is None
@property
def failed(self) -> bool:
""" Tells if connection is failed. """
- return self._failed
+ with self._mux:
+ return self._failed
@property
def alive(self) -> bool:
""" Tells if connection is up and no failure detected. """
- return not (self._failed or self.closed)
+ with self._mux:
+ return not (self._failed or self.closed)
def __repr__(self) -> str:
return '{}:{}'.format(self.host or '?', self.port or '?')
@@ -189,8 +190,10 @@ class Connection:
def _fail(self):
""" set client to failed state. """
- self._failed = True
- self._in_use.release()
+ with self._mux:
+ self._failed = True
+
+ self._in_use = False
def read_response(self) -> Union[dict, OrderedDict]:
"""
@@ -202,26 +205,27 @@ class Connection:
('length', Int),
('op_code', Byte),
])
- start_class, start_buffer = response_start.parse(self)
- start = start_class.from_buffer_copy(start_buffer)
- data = response_start.to_python(start)
- response_end = None
- if data['op_code'] == 0:
- response_end = Struct([
- ('version_major', Short),
- ('version_minor', Short),
- ('version_patch', Short),
- ('message', String),
- ])
- elif self.get_protocol_version() >= (1, 4, 0):
- response_end = Struct([
- ('node_uuid', UUIDObject),
- ])
- if response_end:
- end_class, end_buffer = response_end.parse(self)
- end = end_class.from_buffer_copy(end_buffer)
- data.update(response_end.to_python(end))
- return data
+ with BinaryStream(self, self.recv()) as stream:
+ start_class = response_start.parse(stream)
+ start = stream.read_ctype(start_class, direction=READ_BACKWARD)
+ data = response_start.to_python(start)
+ response_end = None
+ if data['op_code'] == 0:
+ response_end = Struct([
+ ('version_major', Short),
+ ('version_minor', Short),
+ ('version_patch', Short),
+ ('message', String),
+ ])
+ elif self.get_protocol_version() >= (1, 4, 0):
+ response_end = Struct([
+ ('node_uuid', UUIDObject),
+ ])
+ if response_end:
+ end_class = response_end.parse(stream)
+ end = stream.read_ctype(end_class, direction=READ_BACKWARD)
+ data.update(response_end.to_python(end))
+ return data
def connect(
self, host: str = None, port: int = None
@@ -234,9 +238,10 @@ class Connection:
"""
detecting_protocol = False
- # go non-blocking for faster reconnect
- if not self._in_use.acquire(blocking=False):
- raise ConnectionError('Connection is in use.')
+ with self._mux:
+ if self._in_use:
+ raise ConnectionError('Connection is in use.')
+ self._in_use = True
# choose highest version first
if self.client.protocol_version is None:
@@ -289,7 +294,11 @@ class Connection:
self.username,
self.password
)
- self.send(hs_request)
+
+ with BinaryStream(self) as stream:
+ hs_request.from_python(stream)
+ self.send(stream.getbuffer())
+
hs_response = self.read_response()
if hs_response['op_code'] == 0:
# disconnect but keep in use
@@ -345,12 +354,7 @@ class Connection:
if not self.failed:
return
- # return connection to initial state regardless of use lock
- self.close(release=False)
- try:
- self._in_use.release()
- except RuntimeError:
- pass
+ self.close()
# connect and silence the connection errors
try:
@@ -370,20 +374,7 @@ class Connection:
to.host = self.host
to.port = self.port
- def clone(self, prefetch: bytes = b'') -> 'Connection':
- """
- Clones this connection in its current state.
-
- :return: `Connection` object.
- """
- clone = self.__class__(self.client, **self.ssl_params)
- self._transfer_params(to=clone)
- if self.alive:
- clone.connect(self.host, self.port)
- clone.prefetch = prefetch
- return clone
-
- def send(self, data: bytes, flags=None):
+ def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
"""
Send data down the socket.
@@ -396,70 +387,45 @@ class Connection:
kwargs = {}
if flags is not None:
kwargs['flags'] = flags
- data = bytes(data)
- total_bytes_sent = 0
-
- while total_bytes_sent < len(data):
- try:
- bytes_sent = self.socket.send(
- data[total_bytes_sent:],
- **kwargs
- )
- except connection_errors:
- self._fail()
- self.reconnect()
- raise
- if bytes_sent == 0:
- self._fail()
- self.reconnect()
- raise SocketError('Connection broken.')
- total_bytes_sent += bytes_sent
-
- def recv(self, buffersize, flags=None) -> bytes:
- """
- Receive data from socket or read-ahead buffer.
- :param buffersize: bytes to receive,
- :param flags: (optional) OS-specific flags,
- :return: data received.
- """
+ try:
+ self.socket.sendall(data, **kwargs)
+ except Exception:
+ self._fail()
+ self.reconnect()
+ raise
+
+ def recv(self, flags=None) -> bytearray:
+ def _recv(buffer, num_bytes):
+ bytes_to_receive = num_bytes
+ while bytes_to_receive > 0:
+ try:
+ bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs)
+ if bytes_rcvd == 0:
+ raise SocketError('Connection broken.')
+ except connection_errors:
+ self._fail()
+ self.reconnect()
+ raise
+
+ buffer = buffer[bytes_rcvd:]
+ bytes_to_receive -= bytes_rcvd
+
if self.closed:
raise SocketError('Attempt to use closed connection.')
- pref_size = len(self.prefetch)
- if buffersize > pref_size:
- result = self.prefetch
- self.prefetch = b''
- try:
- result += self._recv(buffersize-pref_size, flags)
- except connection_errors:
- self._fail()
- self.reconnect()
- raise
- return result
- else:
- result = self.prefetch[:buffersize]
- self.prefetch = self.prefetch[buffersize:]
- return result
-
- def _recv(self, buffersize, flags=None) -> bytes:
- """
- Handle socket data reading.
- """
kwargs = {}
if flags is not None:
kwargs['flags'] = flags
- chunks = []
- bytes_rcvd = 0
- while bytes_rcvd < buffersize:
- chunk = self.socket.recv(buffersize-bytes_rcvd, **kwargs)
- if chunk == b'':
- raise SocketError('Connection broken.')
- chunks.append(chunk)
- bytes_rcvd += len(chunk)
+ data = bytearray(4)
+ _recv(memoryview(data), 4)
+ response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
+
+ data.extend(bytearray(response_len))
+ _recv(memoryview(data)[4:], response_len)
+ return data
- return b''.join(chunks)
def close(self, release=True):
"""
@@ -467,16 +433,14 @@ class Connection:
not required, since sockets are automatically closed when
garbage-collected.
"""
- if release:
- try:
- self._in_use.release()
- except RuntimeError:
- pass
-
- if self._socket:
- try:
- self._socket.shutdown(socket.SHUT_RDWR)
- self._socket.close()
- except connection_errors:
- pass
- self._socket = None
+ with self._mux:
+ if self._socket:
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ self._socket.close()
+ except connection_errors:
+ pass
+ self._socket = None
+
+ if release:
+ self._in_use = False
diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py
index 2e0264f..3315c4e 100644
--- a/pyignite/connection/handshake.py
+++ b/pyignite/connection/handshake.py
@@ -50,7 +50,7 @@ class HandshakeRequest:
])
self.handshake_struct = Struct(fields)
- def __bytes__(self) -> bytes:
+ def from_python(self, stream):
handshake_data = {
'length': 8,
'op_code': OP_HANDSHAKE,
@@ -69,4 +69,5 @@ class HandshakeRequest:
len(self.username),
len(self.password),
])
- return self.handshake_struct.from_python(handshake_data)
+
+ self.handshake_struct.from_python(stream, handshake_data)
diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py
index 5024f79..49860bd 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/datatypes/__init__.py
@@ -25,3 +25,22 @@ from .primitive import *
from .primitive_arrays import *
from .primitive_objects import *
from .standard import *
+from ..stream import BinaryStream, READ_BACKWARD
+
+
+def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
+ """
+ Unwrap wrapped BinaryObject and convert it to Python data.
+
+ :param client: connection to Ignite cluster,
+ :param wrapped: `WrappedDataObject` value,
+ :return: dict representing wrapped BinaryObject.
+ """
+ from pyignite.datatypes.complex import BinaryObject
+
+ blob, offset = wrapped
+ with BinaryStream(client.random_node, blob) as stream:
+ data_class = BinaryObject.parse(stream)
+ result = BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client)
+
+ return result
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index e94db5f..eadaef9 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -92,10 +92,11 @@ class PropBase:
)
@classmethod
- def parse(cls, connection: 'Connection'):
+ def parse(cls, stream):
+ init_pos = stream.tell()
header_class = cls.build_header()
- header_buffer = connection.recv(ctypes.sizeof(header_class))
- data_class, data_buffer = cls.prop_data_class.parse(connection)
+ data_class = cls.prop_data_class.parse(stream)
+
prop_class = type(
cls.__name__,
(header_class,),
@@ -106,7 +107,9 @@ class PropBase:
],
}
)
- return prop_class, header_buffer + data_buffer
+
+ stream.seek(init_pos + ctypes.sizeof(prop_class))
+ return prop_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -115,11 +118,12 @@ class PropBase:
)
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
header_class = cls.build_header()
header = header_class()
header.prop_code = cls.prop_code
- return bytes(header) + cls.prop_data_class.from_python(value)
+ stream.write(bytes(header))
+ cls.prop_data_class.from_python(stream, value)
class PropName(PropBase):
@@ -275,7 +279,7 @@ class PropStatisticsEnabled(PropBase):
class AnyProperty(PropBase):
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
raise Exception(
'You must choose a certain type '
'for your cache configuration property'
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index 6860583..aed3cda 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -15,27 +15,27 @@
from collections import OrderedDict
import ctypes
-import inspect
+from io import SEEK_CUR
from typing import Iterable, Dict
from pyignite.constants import *
from pyignite.exceptions import ParseError
-
from .base import IgniteDataType
from .internal import AnyDataObject, infer_from_python
from .type_codes import *
from .type_ids import *
from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
__all__ = [
'Map', 'ObjectArrayObject', 'CollectionObject', 'MapObject',
'WrappedDataObject', 'BinaryObject',
]
+from ..stream import BinaryStream
+
-class ObjectArrayObject(IgniteDataType):
+class ObjectArrayObject(IgniteDataType, Nullable):
"""
Array of Ignite objects of any consistent type. Its Python representation
is tuple(type_id, iterable of any type). The only type ID that makes sense
@@ -69,20 +69,14 @@ class ObjectArrayObject(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
- fields = []
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+ fields = []
for i in range(header.length):
- c_type, buffer_fragment = AnyDataObject.parse(client)
- buffer += buffer_fragment
+ c_type = AnyDataObject.parse(stream)
fields.append(('element_{}'.format(i), c_type))
final_class = type(
@@ -93,15 +87,13 @@ class ObjectArrayObject(IgniteDataType):
'_fields_': fields,
}
)
- return final_class, buffer
+
+ return final_class
@classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
+ def to_python_not_null(cls, ctype_object, *args, **kwargs):
result = []
- length = getattr(ctype_object, "length", None)
- if length is None:
- return None
- for i in range(length):
+ for i in range(ctype_object.length):
result.append(
AnyDataObject.to_python(
getattr(ctype_object, 'element_{}'.format(i)),
@@ -111,10 +103,7 @@ class ObjectArrayObject(IgniteDataType):
return ctype_object.type_id, result
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value):
type_or_id, value = value
header_class = cls.build_header()
header = header_class()
@@ -129,14 +118,13 @@ class ObjectArrayObject(IgniteDataType):
length = 1
header.length = length
header.type_id = type_or_id
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += infer_from_python(x)
- return bytes(buffer)
+ infer_from_python(stream, x)
-class WrappedDataObject(IgniteDataType):
+class WrappedDataObject(IgniteDataType, Nullable):
"""
One or more binary objects can be wrapped in an array. This allows reading,
storing, passing and writing objects efficiently without understanding
@@ -162,15 +150,9 @@ class WrappedDataObject(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
+ header = stream.read_ctype(header_class)
final_class = type(
cls.__name__,
@@ -183,21 +165,20 @@ class WrappedDataObject(IgniteDataType):
],
}
)
- buffer += client.recv(
- ctypes.sizeof(final_class) - ctypes.sizeof(header_class)
- )
- return final_class, buffer
+
+ stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
return bytes(ctype_object.payload), ctype_object.offset
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
raise ParseError('Send unwrapped data.')
-class CollectionObject(IgniteDataType):
+class CollectionObject(IgniteDataType, Nullable):
"""
Similar to object array, but contains platform-agnostic deserialization
type hint instead of type ID.
@@ -260,20 +241,14 @@ class CollectionObject(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
- fields = []
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+ fields = []
for i in range(header.length):
- c_type, buffer_fragment = AnyDataObject.parse(client)
- buffer += buffer_fragment
+ c_type = AnyDataObject.parse(stream)
fields.append(('element_{}'.format(i), c_type))
final_class = type(
@@ -284,7 +259,7 @@ class CollectionObject(IgniteDataType):
'_fields_': fields,
}
)
- return final_class, buffer
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -302,10 +277,7 @@ class CollectionObject(IgniteDataType):
return ctype_object.type, result
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value):
type_or_id, value = value
header_class = cls.build_header()
header = header_class()
@@ -320,14 +292,13 @@ class CollectionObject(IgniteDataType):
length = 1
header.length = length
header.type = type_or_id
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += infer_from_python(x)
- return bytes(buffer)
+ infer_from_python(stream, x)
-class Map(IgniteDataType):
+class Map(IgniteDataType, Nullable):
"""
Dictionary type, payload-only.
@@ -358,20 +329,14 @@ class Map(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
- fields = []
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+ fields = []
for i in range(header.length << 1):
- c_type, buffer_fragment = AnyDataObject.parse(client)
- buffer += buffer_fragment
+ c_type = AnyDataObject.parse(stream)
fields.append(('element_{}'.format(i), c_type))
final_class = type(
@@ -382,7 +347,7 @@ class Map(IgniteDataType):
'_fields_': fields,
}
)
- return final_class, buffer
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -402,7 +367,7 @@ class Map(IgniteDataType):
return result
@classmethod
- def from_python(cls, value, type_id=None):
+ def from_python(cls, stream, value, type_id=None):
header_class = cls.build_header()
header = header_class()
length = len(value)
@@ -414,12 +379,11 @@ class Map(IgniteDataType):
)
if hasattr(header, 'type'):
header.type = type_id
- buffer = bytearray(header)
+ stream.write(header)
for k, v in value.items():
- buffer += infer_from_python(k)
- buffer += infer_from_python(v)
- return bytes(buffer)
+ infer_from_python(stream, k)
+ infer_from_python(stream, v)
class MapObject(Map):
@@ -462,15 +426,16 @@ class MapObject(Map):
)
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
if value is None:
- return Null.from_python()
+ Null.from_python(stream)
+ return
type_id, value = value
- return super().from_python(value, type_id)
+ super().from_python(stream, value, type_id)
-class BinaryObject(IgniteDataType):
+class BinaryObject(IgniteDataType, Nullable):
_type_id = TYPE_BINARY_OBJ
type_code = TC_COMPLEX_OBJECT
@@ -482,42 +447,14 @@ class BinaryObject(IgniteDataType):
COMPACT_FOOTER = 0x0020
@staticmethod
- def find_client():
- """
- A nice hack. Extracts the nearest `Client` instance from the
- call stack.
- """
- from pyignite import Client
- from pyignite.connection import Connection
-
- frame = None
- try:
- for rec in inspect.stack()[2:]:
- frame = rec[0]
- code = frame.f_code
- for varname in code.co_varnames:
- suspect = frame.f_locals.get(varname)
- if isinstance(suspect, Client):
- return suspect
- if isinstance(suspect, Connection):
- return suspect.client
- finally:
- del frame
-
- @staticmethod
- def hashcode(
- value: object, client: 'Client' = None, *args, **kwargs
- ) -> int:
+ def hashcode(value: object, client: None) -> int:
# binary objects's hashcode implementation is special in the sense
# that you need to fully serialize the object to calculate
# its hashcode
- if value._hashcode is None:
+ if not value._hashcode and client :
- # …and for to serialize it you need a Client instance
- if client is None:
- client = BinaryObject.find_client()
-
- value._build(client)
+ with BinaryStream(client.random_node) as stream:
+ value._from_python(stream, save_to_buf=True)
return value._hashcode
@@ -565,41 +502,25 @@ class BinaryObject(IgniteDataType):
},
)
- @staticmethod
- def get_dataclass(conn: 'Connection', header) -> OrderedDict:
- # get field names from outer space
- result = conn.client.query_binary_type(
- header.type_id,
- header.schema_id
- )
- if not result:
- raise ParseError('Binary type is not registered')
- return result
-
@classmethod
- def parse(cls, client: 'Client'):
+ def parse_not_null(cls, stream):
from pyignite.datatypes import Struct
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
header_class = cls.build_header()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
# ignore full schema, always retrieve fields' types and order
# from complex types registry
- data_class = cls.get_dataclass(client, header)
+ data_class = stream.get_dataclass(header)
fields = data_class.schema.items()
object_fields_struct = Struct(fields)
- object_fields, object_fields_buffer = object_fields_struct.parse(client)
- buffer += object_fields_buffer
+ object_fields = object_fields_struct.parse(stream)
final_class_fields = [('object_fields', object_fields)]
if header.flags & cls.HAS_SCHEMA:
schema = cls.schema_type(header.flags) * len(fields)
- buffer += client.recv(ctypes.sizeof(schema))
+ stream.seek(ctypes.sizeof(schema), SEEK_CUR)
final_class_fields.append(('schema', schema))
final_class = type(
@@ -611,8 +532,8 @@ class BinaryObject(IgniteDataType):
}
)
# register schema encoding approach
- client.compact_footer = bool(header.flags & cls.COMPACT_FOOTER)
- return final_class, buffer
+ stream.compact_footer = bool(header.flags & cls.COMPACT_FOOTER)
+ return final_class
@classmethod
def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
@@ -642,23 +563,9 @@ class BinaryObject(IgniteDataType):
return result
@classmethod
- def from_python(cls, value: object):
- if value is None:
- return Null.from_python()
-
- if getattr(value, '_buffer', None) is None:
- client = cls.find_client()
-
- # if no client can be found, the class of the `value` is discarded
- # and the new dataclass is automatically registered later on
- if client:
- client.register_binary_type(value.__class__)
- else:
- raise Warning(
- 'Can not register binary type {}'.format(value.type_name)
- )
-
- # build binary representation
- value._build(client)
-
- return value._buffer
+ def from_python_not_null(cls, stream, value):
+ stream.register_binary_type(value.__class__)
+ if getattr(value, '_buffer', None):
+ stream.write(value._buffer)
+ else:
+ value._from_python(stream)
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 23b9cc4..0111a22 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -17,6 +17,7 @@ from collections import OrderedDict
import ctypes
import decimal
from datetime import date, datetime, timedelta
+from io import SEEK_CUR
from typing import Any, Tuple, Union, Callable
import uuid
@@ -33,6 +34,8 @@ __all__ = [
'infer_from_python',
]
+from ..stream import READ_BACKWARD
+
def tc_map(key: bytes, _memo_map: dict = {}):
"""
@@ -119,11 +122,12 @@ class Conditional:
self.var1 = var1
self.var2 = var2
- def parse(self, client: 'Client', context):
- return self.var1.parse(client) if self.predicate1(context) else self.var2.parse(client)
+ def parse(self, stream, context):
+ return self.var1.parse(stream) if self.predicate1(context) else self.var2.parse(stream)
def to_python(self, ctype_object, context, *args, **kwargs):
- return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context) else self.var2.to_python(ctype_object, *args, **kwargs)
+ return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context)\
+ else self.var2.to_python(ctype_object, *args, **kwargs)
@attr.s
class StructArray:
@@ -144,14 +148,17 @@ class StructArray:
},
)
- def parse(self, client: 'Client'):
- buffer = client.recv(ctypes.sizeof(self.counter_type))
- length = int.from_bytes(buffer, byteorder=PROTOCOL_BYTE_ORDER)
- fields = []
+ def parse(self, stream):
+ counter_type_len = ctypes.sizeof(self.counter_type)
+ length = int.from_bytes(
+ stream.mem_view(offset=counter_type_len),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
+ stream.seek(counter_type_len, SEEK_CUR)
+ fields = []
for i in range(length):
- c_type, buffer_fragment = Struct(self.following).parse(client)
- buffer += buffer_fragment
+ c_type = Struct(self.following).parse(stream)
fields.append(('element_{}'.format(i), c_type))
data_class = type(
@@ -163,7 +170,7 @@ class StructArray:
},
)
- return data_class, buffer
+ return data_class
def to_python(self, ctype_object, *args, **kwargs):
result = []
@@ -179,20 +186,19 @@ class StructArray:
)
return result
- def from_python(self, value):
+ def from_python(self, stream, value):
length = len(value)
header_class = self.build_header_class()
header = header_class()
header.length = length
- buffer = bytearray(header)
+
+ stream.write(header)
for i, v in enumerate(value):
for default_key, default_value in self.defaults.items():
v.setdefault(default_key, default_value)
for name, el_class in self.following:
- buffer += el_class.from_python(v[name])
-
- return bytes(buffer)
+ el_class.from_python(stream, v[name])
@attr.s
@@ -202,21 +208,13 @@ class Struct:
dict_type = attr.ib(default=OrderedDict)
defaults = attr.ib(type=dict, default={})
- def parse(
- self, client: 'Client'
- ) -> Tuple[ctypes.LittleEndianStructure, bytes]:
- buffer = b''
- fields = []
- values = {}
-
+ def parse(self, stream):
+ fields, values = [], {}
for name, c_type in self.fields:
is_cond = isinstance(c_type, Conditional)
- c_type, buffer_fragment = c_type.parse(client, values) if is_cond else c_type.parse(client)
- buffer += buffer_fragment
-
+ c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream)
fields.append((name, c_type))
-
- values[name] = buffer_fragment
+ values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
data_class = type(
'Struct',
@@ -227,7 +225,7 @@ class Struct:
},
)
- return data_class, buffer
+ return data_class
def to_python(
self, ctype_object, *args, **kwargs
@@ -245,16 +243,12 @@ class Struct:
)
return result
- def from_python(self, value) -> bytes:
- buffer = b''
-
+ def from_python(self, stream, value):
for default_key, default_value in self.defaults.items():
value.setdefault(default_key, default_value)
for name, el_class in self.fields:
- buffer += el_class.from_python(value[name])
-
- return buffer
+ el_class.from_python(stream, value[name])
class AnyDataObject:
@@ -299,14 +293,13 @@ class AnyDataObject:
return type_first
@classmethod
- def parse(cls, client: 'Client'):
- type_code = client.recv(ctypes.sizeof(ctypes.c_byte))
+ def parse(cls, stream):
+ type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte)))
try:
data_class = tc_map(type_code)
except KeyError:
raise ParseError('Unknown type code: `{}`'.format(type_code))
- client.prefetch += type_code
- return data_class.parse(client)
+ return data_class.parse(stream)
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -418,11 +411,12 @@ class AnyDataObject:
)
@classmethod
- def from_python(cls, value):
- return cls.map_python_type(value).from_python(value)
+ def from_python(cls, stream, value):
+ p_type = cls.map_python_type(value)
+ p_type.from_python(stream, value)
-def infer_from_python(value: Any):
+def infer_from_python(stream, value: Any):
"""
Convert pythonic value to ctypes buffer, type hint-aware.
@@ -433,7 +427,8 @@ def infer_from_python(value: Any):
value, data_type = value
else:
data_type = AnyDataObject
- return data_type.from_python(value)
+
+ data_type.from_python(stream, value)
@attr.s
@@ -455,15 +450,14 @@ class AnyDataArray(AnyDataObject):
}
)
- def parse(self, client: 'Client'):
+ def parse(self, stream):
header_class = self.build_header()
- buffer = client.recv(ctypes.sizeof(header_class))
- header = header_class.from_buffer_copy(buffer)
- fields = []
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+ fields = []
for i in range(header.length):
- c_type, buffer_fragment = super().parse(client)
- buffer += buffer_fragment
+ c_type = super().parse(stream)
fields.append(('element_{}'.format(i), c_type))
final_class = type(
@@ -474,7 +468,7 @@ class AnyDataArray(AnyDataObject):
'_fields_': fields,
}
)
- return final_class, buffer
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -491,7 +485,7 @@ class AnyDataArray(AnyDataObject):
)
return result
- def from_python(self, value):
+ def from_python(self, stream, value):
header_class = self.build_header()
header = header_class()
@@ -501,8 +495,7 @@ class AnyDataArray(AnyDataObject):
value = [value]
length = 1
header.length = length
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += infer_from_python(x)
- return bytes(buffer)
+ infer_from_python(stream, x)
diff --git a/pyignite/datatypes/null_object.py b/pyignite/datatypes/null_object.py
index 19b41c7..912ded8 100644
--- a/pyignite/datatypes/null_object.py
+++ b/pyignite/datatypes/null_object.py
@@ -20,6 +20,7 @@ There can't be null type, because null payload takes exactly 0 bytes.
"""
import ctypes
+from io import SEEK_CUR
from typing import Any
from .base import IgniteDataType
@@ -28,6 +29,8 @@ from .type_codes import TC_NULL
__all__ = ['Null']
+from ..constants import PROTOCOL_BYTE_ORDER
+
class Null(IgniteDataType):
default = None
@@ -55,16 +58,56 @@ class Null(IgniteDataType):
return cls._object_c_type
@classmethod
- def parse(cls, client: 'Client'):
- buffer = client.recv(ctypes.sizeof(ctypes.c_byte))
- data_type = cls.build_c_type()
- return data_type, buffer
+ def parse(cls, stream):
+ init_pos, offset = stream.tell(), ctypes.sizeof(ctypes.c_byte)
+ stream.seek(offset, SEEK_CUR)
+ return cls.build_c_type()
@staticmethod
def to_python(*args, **kwargs):
return None
@staticmethod
- def from_python(*args):
- return TC_NULL
+ def from_python(stream, *args):
+ stream.write(TC_NULL)
+
+
+class Nullable:
+ @classmethod
+ def parse_not_null(cls, stream):
+ raise NotImplementedError
+
+ @classmethod
+ def parse(cls, stream):
+ type_len = ctypes.sizeof(ctypes.c_byte)
+
+ if stream.mem_view(offset=type_len) == TC_NULL:
+ stream.seek(type_len, SEEK_CUR)
+ return Null.build_c_type()
+
+ return cls.parse_not_null(stream)
+ @classmethod
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+ raise NotImplementedError
+
+ @classmethod
+ def to_python(cls, ctypes_object, *args, **kwargs):
+ if ctypes_object.type_code == int.from_bytes(
+ TC_NULL,
+ byteorder=PROTOCOL_BYTE_ORDER
+ ):
+ return None
+
+ return cls.to_python_not_null(ctypes_object, *args, **kwargs)
+
+ @classmethod
+ def from_python_not_null(cls, stream, value):
+ raise NotImplementedError
+
+ @classmethod
+ def from_python(cls, stream, value):
+ if value is None:
+ Null.from_python(stream)
+ else:
+ cls.from_python_not_null(stream, value)
diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py
index d549fda..ffa2e32 100644
--- a/pyignite/datatypes/primitive.py
+++ b/pyignite/datatypes/primitive.py
@@ -15,7 +15,7 @@
import ctypes
import struct
-import sys
+from io import SEEK_CUR
from pyignite.constants import *
from .base import IgniteDataType
@@ -47,8 +47,10 @@ class Primitive(IgniteDataType):
c_type = None
@classmethod
- def parse(cls, client: 'Client'):
- return cls.c_type, client.recv(ctypes.sizeof(cls.c_type))
+ def parse(cls, stream):
+ init_pos, offset = stream.tell(), ctypes.sizeof(cls.c_type)
+ stream.seek(offset, SEEK_CUR)
+ return cls.c_type
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
@@ -61,8 +63,8 @@ class Byte(Primitive):
c_type = ctypes.c_byte
@classmethod
- def from_python(cls, value):
- return struct.pack("<b", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<b", value))
class Short(Primitive):
@@ -71,8 +73,8 @@ class Short(Primitive):
c_type = ctypes.c_short
@classmethod
- def from_python(cls, value):
- return struct.pack("<h", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<h", value))
class Int(Primitive):
@@ -81,8 +83,8 @@ class Int(Primitive):
c_type = ctypes.c_int
@classmethod
- def from_python(cls, value):
- return struct.pack("<i", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<i", value))
class Long(Primitive):
@@ -91,8 +93,8 @@ class Long(Primitive):
c_type = ctypes.c_longlong
@classmethod
- def from_python(cls, value):
- return struct.pack("<q", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<q", value))
class Float(Primitive):
@@ -101,8 +103,8 @@ class Float(Primitive):
c_type = ctypes.c_float
@classmethod
- def from_python(cls, value):
- return struct.pack("<f", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<f", value))
class Double(Primitive):
@@ -111,8 +113,8 @@ class Double(Primitive):
c_type = ctypes.c_double
@classmethod
- def from_python(cls, value):
- return struct.pack("<d", value)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<d", value))
class Char(Primitive):
@@ -128,16 +130,15 @@ class Char(Primitive):
).decode(PROTOCOL_CHAR_ENCODING)
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
if type(value) is str:
value = value.encode(PROTOCOL_CHAR_ENCODING)
# assuming either a bytes or an integer
if type(value) is bytes:
value = int.from_bytes(value, byteorder=PROTOCOL_BYTE_ORDER)
# assuming a valid integer
- return value.to_bytes(
- ctypes.sizeof(cls.c_type),
- byteorder=PROTOCOL_BYTE_ORDER
+ stream.write(
+ value.to_bytes(ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER)
)
@@ -151,5 +152,5 @@ class Bool(Primitive):
return ctype_object != 0
@classmethod
- def from_python(cls, value):
- return struct.pack("<b", 1 if value else 0)
+ def from_python(cls, stream, value):
+ stream.write(struct.pack("<b", 1 if value else 0))
diff --git a/pyignite/datatypes/primitive_arrays.py b/pyignite/datatypes/primitive_arrays.py
index 1b41728..7cb5b20 100644
--- a/pyignite/datatypes/primitive_arrays.py
+++ b/pyignite/datatypes/primitive_arrays.py
@@ -14,11 +14,13 @@
# limitations under the License.
import ctypes
+from io import SEEK_CUR
from typing import Any
from pyignite.constants import *
from . import Null
from .base import IgniteDataType
+from .null_object import Nullable
from .primitive import *
from .type_codes import *
from .type_ids import *
@@ -33,7 +35,7 @@ __all__ = [
]
-class PrimitiveArray(IgniteDataType):
+class PrimitiveArray(IgniteDataType, Nullable):
"""
Base class for array of primitives. Payload-only.
"""
@@ -61,15 +63,10 @@ class PrimitiveArray(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header_class()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
+ header = stream.read_ctype(header_class)
+
final_class = type(
cls.__name__,
(header_class,),
@@ -80,26 +77,18 @@ class PrimitiveArray(IgniteDataType):
],
}
)
- buffer += client.recv(
- ctypes.sizeof(final_class) - ctypes.sizeof(header_class)
- )
- return final_class, buffer
+ stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
- result = []
length = getattr(ctype_object, "length", None)
if length is None:
return None
- for i in range(length):
- result.append(ctype_object.data[i])
- return result
+ return [ctype_object.data[i] for i in range(ctype_object.length)]
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value):
header_class = cls.build_header_class()
header = header_class()
if hasattr(header, 'type_code'):
@@ -109,11 +98,10 @@ class PrimitiveArray(IgniteDataType):
)
length = len(value)
header.length = length
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += cls.primitive_type.from_python(x)
- return bytes(buffer)
+ cls.primitive_type.from_python(stream, x)
class ByteArray(PrimitiveArray):
@@ -130,14 +118,13 @@ class ByteArray(PrimitiveArray):
return bytearray(data)
@classmethod
- def from_python(cls, value):
+ def from_python(cls, stream, value):
header_class = cls.build_header_class()
header = header_class()
-
- # no need to iterate on bytes or bytearray
- # to create ByteArray data buffer
header.length = len(value)
- return bytes(bytearray(header) + bytearray(value))
+
+ stream.write(header)
+ stream.write(bytearray(value))
class ShortArray(PrimitiveArray):
@@ -224,20 +211,20 @@ class ByteArrayObject(PrimitiveArrayObject):
return ByteArray.to_python(ctype_object, *args, **kwargs)
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value):
header_class = cls.build_header_class()
header = header_class()
header.type_code = int.from_bytes(
cls.type_code,
byteorder=PROTOCOL_BYTE_ORDER
)
-
- # no need to iterate on bytes or bytearray
- # to create ByteArrayObject data buffer
header.length = len(value)
+ stream.write(header)
+
+ if isinstance(value, (bytes, bytearray)):
+ stream.write(value)
+ return
+
try:
# `value` is a `bytearray` or a sequence of integer values
# in range 0 to 255
@@ -253,7 +240,7 @@ class ByteArrayObject(PrimitiveArrayObject):
'byte must be in range(-128, 256)!'
) from None
- return bytes(bytearray(header) + value_buffer)
+ stream.write(value_buffer)
class ShortArrayObject(PrimitiveArrayObject):
diff --git a/pyignite/datatypes/primitive_objects.py b/pyignite/datatypes/primitive_objects.py
index 53f12d2..e942dd7 100644
--- a/pyignite/datatypes/primitive_objects.py
+++ b/pyignite/datatypes/primitive_objects.py
@@ -14,16 +14,15 @@
# limitations under the License.
import ctypes
+from io import SEEK_CUR
from pyignite.constants import *
from pyignite.utils import unsigned
-
from .base import IgniteDataType
from .type_codes import *
from .type_ids import *
from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
__all__ = [
'DataObject', 'ByteObject', 'ShortObject', 'IntObject', 'LongObject',
@@ -31,7 +30,7 @@ __all__ = [
]
-class DataObject(IgniteDataType):
+class DataObject(IgniteDataType, Nullable):
"""
Base class for primitive data objects.
@@ -61,22 +60,17 @@ class DataObject(IgniteDataType):
return cls._object_c_type
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
+ def parse_not_null(cls, stream):
data_type = cls.build_c_type()
- buffer = tc_type + client.recv(ctypes.sizeof(data_type) - len(tc_type))
- return data_type, buffer
+ stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+ return data_type
@staticmethod
def to_python(ctype_object, *args, **kwargs):
return getattr(ctype_object, "value", None)
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -84,7 +78,7 @@ class DataObject(IgniteDataType):
byteorder=PROTOCOL_BYTE_ORDER
)
data_object.value = value
- return bytes(data_object)
+ stream.write(data_object)
class ByteObject(DataObject):
@@ -201,18 +195,16 @@ class CharObject(DataObject):
).decode(PROTOCOL_CHAR_ENCODING)
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value):
if type(value) is str:
value = value.encode(PROTOCOL_CHAR_ENCODING)
# assuming either a bytes or an integer
if type(value) is bytes:
value = int.from_bytes(value, byteorder=PROTOCOL_BYTE_ORDER)
# assuming a valid integer
- return cls.type_code + value.to_bytes(
- ctypes.sizeof(cls.c_type),
- byteorder=PROTOCOL_BYTE_ORDER
+ stream.write(cls.type_code)
+ stream.write(
+ value.to_bytes(ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER)
)
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 0f16735..af50a8e 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -16,6 +16,7 @@
import ctypes
from datetime import date, datetime, time, timedelta
import decimal
+from io import SEEK_CUR
from math import ceil
from typing import Any, Tuple
import uuid
@@ -26,8 +27,7 @@ from .base import IgniteDataType
from .type_codes import *
from .type_ids import *
from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
__all__ = [
'String', 'DecimalObject', 'UUIDObject', 'TimestampObject', 'DateObject',
@@ -44,7 +44,7 @@ __all__ = [
]
-class StandardObject(IgniteDataType):
+class StandardObject(IgniteDataType, Nullable):
_type_name = None
_type_id = None
type_code = None
@@ -54,18 +54,13 @@ class StandardObject(IgniteDataType):
raise NotImplementedError('This object is generic')
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
- c_type = cls.build_c_type()
- buffer = tc_type + client.recv(ctypes.sizeof(c_type) - len(tc_type))
- return c_type, buffer
+ def parse_not_null(cls, stream):
+ data_type = cls.build_c_type()
+ stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+ return data_type
-class String(IgniteDataType):
+class String(IgniteDataType, Nullable):
"""
Pascal-style string: `c_int` counter, followed by count*bytes.
UTF-8-encoded, so that one character may take 1 to 4 bytes.
@@ -95,35 +90,25 @@ class String(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
- # String or Null
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
- buffer = tc_type + client.recv(ctypes.sizeof(ctypes.c_int))
- length = int.from_bytes(buffer[1:], byteorder=PROTOCOL_BYTE_ORDER)
+ def parse_not_null(cls, stream):
+ length = int.from_bytes(
+ stream.mem_view(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
+ byteorder=PROTOCOL_BYTE_ORDER
+ )
data_type = cls.build_c_type(length)
- buffer += client.recv(ctypes.sizeof(data_type) - len(buffer))
+ stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+ return data_type
- return data_type, buffer
-
- @staticmethod
- def to_python(ctype_object, *args, **kwargs):
- length = getattr(ctype_object, 'length', None)
- if length is None:
- return None
- elif length > 0:
+ @classmethod
+ def to_python_not_null(cls, ctype_object, *args, **kwargs):
+ if ctype_object.length > 0:
return ctype_object.data.decode(PROTOCOL_STRING_ENCODING)
- else:
- return ''
- @classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
+ return ''
+ @classmethod
+ def from_python_not_null(cls, stream, value):
if isinstance(value, str):
value = value.encode(PROTOCOL_STRING_ENCODING)
length = len(value)
@@ -135,10 +120,11 @@ class String(IgniteDataType):
)
data_object.length = length
data_object.data = value
- return bytes(data_object)
+
+ stream.write(data_object)
-class DecimalObject(IgniteDataType):
+class DecimalObject(IgniteDataType, Nullable):
_type_name = NAME_DECIMAL
_type_id = TYPE_DECIMAL
type_code = TC_DECIMAL
@@ -165,18 +151,10 @@ class DecimalObject(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
- # Decimal or Null
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_c_header()
- buffer = tc_type + client.recv(
- ctypes.sizeof(header_class)
- - len(tc_type)
- )
- header = header_class.from_buffer_copy(buffer)
+ header = stream.read_ctype(header_class)
+
data_type = type(
cls.__name__,
(header_class,),
@@ -187,17 +165,12 @@ class DecimalObject(IgniteDataType):
],
}
)
- buffer += client.recv(
- ctypes.sizeof(data_type)
- - ctypes.sizeof(header_class)
- )
- return data_type, buffer
- @classmethod
- def to_python(cls, ctype_object, *args, **kwargs):
- if getattr(ctype_object, 'length', None) is None:
- return None
+ stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+ return data_type
+ @classmethod
+ def to_python_not_null(cls, ctype_object, *args, **kwargs):
sign = 1 if ctype_object.data[0] & 0x80 else 0
data = ctype_object.data[1:]
data.insert(0, ctype_object.data[0] & 0x7f)
@@ -218,10 +191,7 @@ class DecimalObject(IgniteDataType):
return result
@classmethod
- def from_python(cls, value: decimal.Decimal):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value: decimal.Decimal):
sign, digits, scale = value.normalize().as_tuple()
integer = int(''.join([str(d) for d in digits]))
# calculate number of bytes (at least one, and not forget the sign bit)
@@ -257,7 +227,8 @@ class DecimalObject(IgniteDataType):
data_object.scale = -scale
for i in range(length):
data_object.data[i] = data[i]
- return bytes(data_object)
+
+ stream.write(data_object)
class UUIDObject(StandardObject):
@@ -300,10 +271,7 @@ class UUIDObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python(cls, value: uuid.UUID):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value: uuid.UUID):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -312,15 +280,11 @@ class UUIDObject(StandardObject):
)
for i, byte in zip(cls.UUID_BYTE_ORDER, bytearray(value.bytes)):
data_object.value[i] = byte
- return bytes(data_object)
+
+ stream.write(data_object)
@classmethod
- def to_python(cls, ctypes_object, *args, **kwargs):
- if ctypes_object.type_code == int.from_bytes(
- TC_NULL,
- byteorder=PROTOCOL_BYTE_ORDER
- ):
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
uuid_array = bytearray(ctypes_object.value)
return uuid.UUID(
bytes=bytes([uuid_array[i] for i in cls.UUID_BYTE_ORDER])
@@ -367,9 +331,7 @@ class TimestampObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python(cls, value: tuple):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value: tuple):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -378,15 +340,11 @@ class TimestampObject(StandardObject):
)
data_object.epoch = int(value[0].timestamp() * 1000)
data_object.fraction = value[1]
- return bytes(data_object)
+
+ stream.write(data_object)
@classmethod
- def to_python(cls, ctypes_object, *args, **kwargs):
- if ctypes_object.type_code == int.from_bytes(
- TC_NULL,
- byteorder=PROTOCOL_BYTE_ORDER
- ):
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return (
datetime.fromtimestamp(ctypes_object.epoch/1000),
ctypes_object.fraction
@@ -428,9 +386,7 @@ class DateObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python(cls, value: [date, datetime]):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value: [date, datetime]):
if type(value) is date:
value = datetime.combine(value, time())
data_type = cls.build_c_type()
@@ -440,15 +396,11 @@ class DateObject(StandardObject):
byteorder=PROTOCOL_BYTE_ORDER
)
data_object.epoch = int(value.timestamp() * 1000)
- return bytes(data_object)
+
+ stream.write(data_object)
@classmethod
- def to_python(cls, ctypes_object, *args, **kwargs):
- if ctypes_object.type_code == int.from_bytes(
- TC_NULL,
- byteorder=PROTOCOL_BYTE_ORDER
- ):
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return datetime.fromtimestamp(ctypes_object.epoch/1000)
@@ -486,9 +438,7 @@ class TimeObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python(cls, value: timedelta):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value: timedelta):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -496,15 +446,11 @@ class TimeObject(StandardObject):
byteorder=PROTOCOL_BYTE_ORDER
)
data_object.value = int(value.total_seconds() * 1000)
- return bytes(data_object)
+
+ stream.write(data_object)
@classmethod
- def to_python(cls, ctypes_object, *args, **kwargs):
- if ctypes_object.type_code == int.from_bytes(
- TC_NULL,
- byteorder=PROTOCOL_BYTE_ORDER
- ):
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return timedelta(milliseconds=ctypes_object.value)
@@ -539,10 +485,7 @@ class EnumObject(StandardObject):
return cls._object_c_type
@classmethod
- def from_python(cls, value: tuple):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value: tuple):
data_type = cls.build_c_type()
data_object = data_type()
data_object.type_code = int.from_bytes(
@@ -550,15 +493,11 @@ class EnumObject(StandardObject):
byteorder=PROTOCOL_BYTE_ORDER
)
data_object.type_id, data_object.ordinal = value
- return bytes(data_object)
+
+ stream.write(data_object)
@classmethod
- def to_python(cls, ctypes_object, *args, **kwargs):
- if ctypes_object.type_code == int.from_bytes(
- TC_NULL,
- byteorder=PROTOCOL_BYTE_ORDER
- ):
- return None
+ def to_python_not_null(cls, ctypes_object, *args, **kwargs):
return ctypes_object.type_id, ctypes_object.ordinal
@@ -571,7 +510,7 @@ class BinaryEnumObject(EnumObject):
type_code = TC_BINARY_ENUM
-class StandardArray(IgniteDataType):
+class StandardArray(IgniteDataType, Nullable):
"""
Base class for array of primitives. Payload-only.
"""
@@ -599,19 +538,14 @@ class StandardArray(IgniteDataType):
)
@classmethod
- def parse(cls, client: 'Client'):
- tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
- if tc_type == TC_NULL:
- return Null.build_c_type(), tc_type
-
+ def parse_not_null(cls, stream):
header_class = cls.build_header_class()
- buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
- header = header_class.from_buffer_copy(buffer)
+ header = stream.read_ctype(header_class)
+ stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+
fields = []
for i in range(header.length):
- c_type, buffer_fragment = cls.standard_type.parse(client)
- buffer += buffer_fragment
+ c_type = cls.standard_type.parse(stream)
fields.append(('element_{}'.format(i), c_type))
final_class = type(
@@ -622,14 +556,15 @@ class StandardArray(IgniteDataType):
'_fields_': fields,
}
)
- return final_class, buffer
+ return final_class
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
- result = []
length = getattr(ctype_object, "length", None)
if length is None:
return None
+
+ result = []
for i in range(length):
result.append(
cls.standard_type.to_python(
@@ -640,9 +575,7 @@ class StandardArray(IgniteDataType):
return result
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
+ def from_python_not_null(cls, stream, value):
header_class = cls.build_header_class()
header = header_class()
if hasattr(header, 'type_code'):
@@ -652,11 +585,10 @@ class StandardArray(IgniteDataType):
)
length = len(value)
header.length = length
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += cls.standard_type.from_python(x)
- return bytes(buffer)
+ cls.standard_type.from_python(stream, x)
class StringArray(StandardArray):
@@ -804,10 +736,7 @@ class EnumArrayObject(StandardArrayObject):
)
@classmethod
- def from_python(cls, value):
- if value is None:
- return Null.from_python()
-
+ def from_python_not_null(cls, stream, value):
type_id, value = value
header_class = cls.build_header_class()
header = header_class()
@@ -819,11 +748,10 @@ class EnumArrayObject(StandardArrayObject):
length = len(value)
header.length = length
header.type_id = type_id
- buffer = bytearray(header)
+ stream.write(header)
for x in value:
- buffer += cls.standard_type.from_python(x)
- return bytes(buffer)
+ cls.standard_type.from_python(stream, x)
@classmethod
def to_python(cls, ctype_object, *args, **kwargs):
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 69b6fa2..5bd114b 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -21,6 +21,7 @@ from pyignite.api.result import APIResult
from pyignite.connection import Connection
from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED
from pyignite.queries.response import Response, SQLResponse
+from pyignite.stream import BinaryStream, READ_BACKWARD
@attr.s
@@ -47,31 +48,28 @@ class Query:
)
return cls._query_c_type
- def _build_header(self, buffer: bytearray, values: dict):
+ def _build_header(self, stream, values: dict):
header_class = self.build_c_type()
+ header_len = ctypes.sizeof(header_class)
+ init_pos = stream.tell()
+ stream.seek(init_pos + header_len)
+
header = header_class()
header.op_code = self.op_code
if self.query_id is None:
header.query_id = randint(MIN_LONG, MAX_LONG)
for name, c_type in self.following:
- buffer += c_type.from_python(values[name])
+ c_type.from_python(stream, values[name])
- header.length = (
- len(buffer)
- + ctypes.sizeof(header_class)
- - ctypes.sizeof(ctypes.c_int)
- )
+ header.length = stream.tell() - init_pos - ctypes.sizeof(ctypes.c_int)
+ stream.seek(init_pos)
return header
- def from_python(self, values: dict = None):
- if values is None:
- values = {}
- buffer = bytearray()
- header = self._build_header(buffer, values)
- buffer[:0] = bytes(header)
- return header.query_id, bytes(buffer)
+ def from_python(self, stream, values: dict = None):
+ header = self._build_header(stream, values if values else {})
+ stream.write(header)
def perform(
self, conn: Connection, query_params: dict = None,
@@ -89,8 +87,9 @@ class Query:
:return: instance of :class:`~pyignite.api.result.APIResult` with raw
value (may undergo further processing in API functions).
"""
- _, send_buffer = self.from_python(query_params)
- conn.send(send_buffer)
+ with BinaryStream(conn) as stream:
+ self.from_python(stream, query_params)
+ conn.send(stream.getbuffer())
if sql:
response_struct = SQLResponse(protocol_version=conn.get_protocol_version(),
@@ -99,8 +98,9 @@ class Query:
response_struct = Response(protocol_version=conn.get_protocol_version(),
following=response_config)
- response_ctype, recv_buffer = response_struct.parse(conn)
- response = response_ctype.from_buffer_copy(recv_buffer)
+ with BinaryStream(conn, conn.recv()) as stream:
+ response_ctype = response_struct.parse(stream)
+ response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
# this test depends on protocol version
if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
@@ -140,7 +140,7 @@ class ConfigQuery(Query):
)
return cls._query_c_type
- def _build_header(self, buffer: bytearray, values: dict):
- header = super()._build_header(buffer, values)
+ def _build_header(self, stream, values: dict):
+ header = super()._build_header(stream, values)
header.config_length = header.length - ctypes.sizeof(type(header))
return header
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index 05a519a..016f577 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from io import SEEK_CUR
import attr
from collections import OrderedDict
@@ -21,6 +22,7 @@ from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR
from pyignite.connection import Connection
from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct
from pyignite.queries.op_codes import OP_SUCCESS
+from pyignite.stream import READ_BACKWARD
@attr.s
@@ -55,12 +57,14 @@ class Response:
)
return self._response_header
- def parse(self, conn: Connection):
+ def parse(self, stream):
+ init_pos = stream.tell()
header_class = self.build_header()
- buffer = bytearray(conn.recv(ctypes.sizeof(header_class)))
- header = header_class.from_buffer_copy(buffer)
- fields = []
+ header_len = ctypes.sizeof(header_class)
+ header = stream.read_ctype(header_class)
+ stream.seek(header_len, SEEK_CUR)
+ fields = []
has_error = False
if self.protocol_version and self.protocol_version >= (1, 4, 0):
if header.flags & RHF_TOPOLOGY_CHANGED:
@@ -76,20 +80,19 @@ class Response:
has_error = header.status_code != OP_SUCCESS
if fields:
- buffer += conn.recv(
- sum([ctypes.sizeof(c_type) for _, c_type in fields])
- )
+ stream.seek(sum(ctypes.sizeof(c_type) for _, c_type in fields), SEEK_CUR)
if has_error:
- msg_type, buffer_fragment = String.parse(conn)
- buffer += buffer_fragment
+ msg_type = String.parse(stream)
fields.append(('error_message', msg_type))
else:
- self._parse_success(conn, buffer, fields)
+ self._parse_success(stream, fields)
- return self._create_parse_result(conn, header_class, fields, buffer)
+ response_class = self._create_response_class(stream, header_class, fields)
+ stream.seek(init_pos + ctypes.sizeof(response_class))
+ return self._create_response_class(stream, header_class, fields)
- def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray):
+ def _create_response_class(self, stream, header_class, fields: list):
response_class = type(
'Response',
(header_class,),
@@ -98,12 +101,11 @@ class Response:
'_fields_': fields,
}
)
- return response_class, bytes(buffer)
+ return response_class
- def _parse_success(self, conn: Connection, buffer: bytearray, fields: list):
+ def _parse_success(self, stream, fields: list):
for name, ignite_type in self.following:
- c_type, buffer_fragment = ignite_type.parse(conn)
- buffer += buffer_fragment
+ c_type = ignite_type.parse(stream)
fields.append((name, c_type))
def to_python(self, ctype_object, *args, **kwargs):
@@ -134,7 +136,7 @@ class SQLResponse(Response):
return 'fields', StringArray
return 'field_count', Int
- def _parse_success(self, conn: Connection, buffer: bytearray, fields: list):
+ def _parse_success(self, stream, fields: list):
following = [
self.fields_or_field_count(),
('row_count', Int),
@@ -142,9 +144,8 @@ class SQLResponse(Response):
if self.has_cursor:
following.insert(0, ('cursor', Long))
body_struct = Struct(following)
- body_class, body_buffer = body_struct.parse(conn)
- body = body_class.from_buffer_copy(body_buffer)
- buffer += body_buffer
+ body_class = body_struct.parse(stream)
+ body = stream.read_ctype(body_class, direction=READ_BACKWARD)
if self.include_field_names:
field_count = body.fields.length
@@ -155,9 +156,8 @@ class SQLResponse(Response):
for i in range(body.row_count):
row_fields = []
for j in range(field_count):
- field_class, field_buffer = AnyDataObject.parse(conn)
+ field_class = AnyDataObject.parse(stream)
row_fields.append(('column_{}'.format(j), field_class))
- buffer += field_buffer
row_class = type(
'SQLResponseRow',
@@ -182,7 +182,7 @@ class SQLResponse(Response):
('more', ctypes.c_byte),
]
- def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray):
+ def _create_response_class(self, stream, header_class, fields: list):
final_class = type(
'SQLResponse',
(header_class,),
@@ -191,8 +191,7 @@ class SQLResponse(Response):
'_fields_': fields,
}
)
- buffer += conn.recv(ctypes.sizeof(final_class) - len(buffer))
- return final_class, bytes(buffer)
+ return final_class
def to_python(self, ctype_object, *args, **kwargs):
if getattr(ctype_object, 'status_code', 0) == 0:
diff --git a/pyignite/datatypes/__init__.py b/pyignite/stream/__init__.py
similarity index 72%
copy from pyignite/datatypes/__init__.py
copy to pyignite/stream/__init__.py
index 5024f79..94153b4 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/stream/__init__.py
@@ -13,15 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""
-This module contains classes, used internally by `pyignite` for parsing and
-creating binary data.
-"""
-
-from .complex import *
-from .internal import *
-from .null_object import *
-from .primitive import *
-from .primitive_arrays import *
-from .primitive_objects import *
-from .standard import *
+from .binary_stream import BinaryStream, READ_FORWARD, READ_BACKWARD
\ No newline at end of file
diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py
new file mode 100644
index 0000000..1ecdcfb
--- /dev/null
+++ b/pyignite/stream/binary_stream.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import ctypes
+from io import BytesIO
+
+import pyignite.utils as ignite_utils
+
+READ_FORWARD = 0
+READ_BACKWARD = 1
+
+
+class BinaryStream:
+ def __init__(self, conn, buf=None):
+ """
+ Initialize binary stream around buffers.
+
+ :param buf: Buffer, optional parameter. If not passed, creates empty BytesIO.
+ :param conn: Connection instance, required.
+ """
+ from pyignite.connection import Connection
+
+ if not isinstance(conn, Connection):
+ raise TypeError(f"invalid parameter: expected instance of {Connection}")
+
+ if buf and not isinstance(buf, (bytearray, bytes, memoryview)):
+ raise TypeError(f"invalid parameter: expected bytes-like object")
+
+ self.conn = conn
+ self.stream = BytesIO(buf) if buf else BytesIO()
+
+ @property
+ def compact_footer(self) -> bool:
+ return self.conn.client.compact_footer
+
+ @compact_footer.setter
+ def compact_footer(self, value: bool):
+ self.conn.client.compact_footer = value
+
+ def read(self, size):
+ buf = bytearray(size)
+ self.stream.readinto(buf)
+ return buf
+
+ def read_ctype(self, ctype_class, position=None, direction=READ_FORWARD):
+ ctype_len = ctypes.sizeof(ctype_class)
+
+ if position is not None and position >= 0:
+ init_position = position
+ else:
+ init_position = self.tell()
+
+ if direction == READ_FORWARD:
+ start, end = init_position, init_position + ctype_len
+ else:
+ start, end = init_position - ctype_len, init_position
+
+ buf = self.stream.getbuffer()[start:end]
+ return ctype_class.from_buffer_copy(buf)
+
+ def write(self, buf):
+ return self.stream.write(buf)
+
+ def tell(self):
+ return self.stream.tell()
+
+ def seek(self, *args, **kwargs):
+ return self.stream.seek(*args, **kwargs)
+
+ def getvalue(self):
+ return self.stream.getvalue()
+
+ def getbuffer(self):
+ return self.stream.getbuffer()
+
+ def mem_view(self, start=-1, offset=0):
+ start = start if start >= 0 else self.tell()
+ return self.stream.getbuffer()[start:start+offset]
+
+ def hashcode(self, start, bytes_len):
+ return ignite_utils.hashcode(self.stream.getbuffer()[start:start+bytes_len])
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.stream.close()
+
+ def get_dataclass(self, header):
+ # get field names from outer space
+ result = self.conn.client.query_binary_type(
+ header.type_id,
+ header.schema_id
+ )
+ if not result:
+ raise RuntimeError('Binary type is not registered')
+ return result
+
+ def register_binary_type(self, *args, **kwargs):
+ return self.conn.client.register_binary_type(*args, **kwargs)
diff --git a/pyignite/utils.py b/pyignite/utils.py
index ef7b6f6..3d0378f 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -19,7 +19,7 @@ import warnings
from functools import wraps
from threading import Event, Thread
-from typing import Any, Callable, Optional, Type, Tuple, Union
+from typing import Any, Optional, Type, Tuple, Union
from pyignite.datatypes.base import IgniteDataType
from .constants import *
@@ -85,29 +85,7 @@ def int_overflow(value: int) -> int:
return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000
-def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
- """
- Unwrap wrapped BinaryObject and convert it to Python data.
-
- :param client: connection to Ignite cluster,
- :param wrapped: `WrappedDataObject` value,
- :return: dict representing wrapped BinaryObject.
- """
- from pyignite.datatypes.complex import BinaryObject
-
- blob, offset = wrapped
- conn_clone = client.random_node.clone(prefetch=blob)
- conn_clone.pos = offset
- data_class, data_bytes = BinaryObject.parse(conn_clone)
- result = BinaryObject.to_python(
- data_class.from_buffer_copy(data_bytes),
- client,
- )
- conn_clone.close()
- return result
-
-
-def hashcode(data: Union[str, bytes]) -> int:
+def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:
"""
Calculate hash code used for identifying objects in Ignite binary API.
diff --git a/requirements/tests.txt b/requirements/tests.txt
index 327f501..893928e 100644
--- a/requirements/tests.txt
+++ b/requirements/tests.txt
@@ -4,3 +4,4 @@ pytest==3.6.1
pytest-cov==2.5.1
teamcity-messages==1.21
psutil==5.6.5
+jinja2==2.11.3
diff --git a/tests/config/ignite-config-ssl.xml b/tests/config/ignite-config-ssl.xml
deleted file mode 100644
index 827405c..0000000
--- a/tests/config/ignite-config-ssl.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd">
- <import resource="ignite-config-base.xml"/>
-
- <bean parent="grid.cfg">
- <property name="connectorConfiguration"><null/></property>
-
- <property name="clientConnectorConfiguration">
- <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
- <property name="host" value="127.0.0.1"/>
- <property name="port" value="${IGNITE_CLIENT_PORT}"/>
- <property name="portRange" value="0"/>
- <property name="sslEnabled" value="true"/>
- <property name="useIgniteSslContextFactory" value="false"/>
- <property name="sslClientAuth" value="true"/>
-
- <!-- Provide Ssl context. -->
- <property name="sslContextFactory">
- <bean class="org.apache.ignite.ssl.SslContextFactory">
- <property name="keyStoreFilePath" value="config/ssl/server.jks"/>
- <property name="keyStorePassword" value="123456"/>
- <property name="trustStoreFilePath" value="config/ssl/trust.jks"/>
- <property name="trustStorePassword" value="123456"/>
- </bean>
- </property>
- </bean>
- </property>
- </bean>
-</beans>
\ No newline at end of file
diff --git a/tests/config/ignite-config.xml b/tests/config/ignite-config.xml
deleted file mode 100644
index 09fba2c..0000000
--- a/tests/config/ignite-config.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:util="http://www.springframework.org/schema/util"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/util
- http://www.springframework.org/schema/util/spring-util.xsd">
- <import resource="ignite-config-base.xml"/>
-
- <bean parent="grid.cfg">
- <property name="clientConnectorConfiguration">
- <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
- <property name="host" value="127.0.0.1"/>
- <property name="port" value="${IGNITE_CLIENT_PORT}"/>
- <property name="portRange" value="0"/>
- </bean>
- </property>
- </bean>
-</beans>
diff --git a/tests/config/ignite-config-base.xml b/tests/config/ignite-config.xml.jinja2
similarity index 65%
rename from tests/config/ignite-config-base.xml
rename to tests/config/ignite-config.xml.jinja2
index 7487618..322a958 100644
--- a/tests/config/ignite-config-base.xml
+++ b/tests/config/ignite-config.xml.jinja2
@@ -26,12 +26,35 @@
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
- <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK"/>
- <property name="searchSystemEnvironment" value="true"/>
- </bean>
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ {% if use_ssl %}
+ <property name="connectorConfiguration"><null/></property>
+ {% endif %}
+
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="{{ ignite_client_port }}"/>
+ <property name="portRange" value="0"/>
+ {% if use_ssl %}
+ <property name="sslEnabled" value="true"/>
+ <property name="useIgniteSslContextFactory" value="false"/>
+ <property name="sslClientAuth" value="true"/>
+
+ <property name="sslContextFactory">
+ <bean class="org.apache.ignite.ssl.SslContextFactory">
+ <property name="keyStoreFilePath" value="config/ssl/server.jks"/>
+ <property name="keyStorePassword" value="123456"/>
+ <property name="trustStoreFilePath" value="config/ssl/trust.jks"/>
+ <property name="trustStorePassword" value="123456"/>
+ </bean>
+ </property>
+ {% endif %}
+ </bean>
+ </property>
+
+ <property name="consistentId" value="srv_{{ ignite_instance_idx }}"/>
- <bean id="grid.cfg" abstract="true" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="localHost" value="127.0.0.1"/>
<property name="discoverySpi">
@@ -42,7 +65,7 @@
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
- <value>127.0.0.1:48500..48503</value>
+ <value>127.0.0.1:48500..48510</value>
</list>
</property>
</bean>
@@ -69,9 +92,9 @@
</list>
</property>
- <property name="gridLogger">
+ <property name="gridLogger">
<bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
- <constructor-arg type="java.lang.String" value="config/log4j.xml"/>
+ <constructor-arg type="java.lang.String" value="config/log4j-{{ ignite_instance_idx }}.xml"/>
</bean>
</property>
</bean>
diff --git a/tests/config/log4j.xml b/tests/config/log4j.xml.jinja2
similarity index 90%
rename from tests/config/log4j.xml
rename to tests/config/log4j.xml.jinja2
index f5562d0..628f66c 100644
--- a/tests/config/log4j.xml
+++ b/tests/config/log4j.xml.jinja2
@@ -23,8 +23,8 @@
<PatternLayout pattern="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
</Console>
<RollingFile name="FILE" append="true"
- filePattern="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}-%i.log.gz"
- fileName="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}.txt">
+ filePattern="logs/ignite-log-{{ ignite_instance_idx }}-%i.txt"
+ fileName="logs/ignite-log-{{ ignite_instance_idx }}.txt">
<PatternLayout pattern="%m%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="10MB" />
diff --git a/tests/conftest.py b/tests/conftest.py
index 9974b16..54a7fda 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -22,7 +22,7 @@ import pytest
from pyignite import Client
from pyignite.constants import *
from pyignite.api import cache_create, cache_destroy
-from tests.util import _start_ignite, start_ignite_gen, get_request_grid_idx
+from tests.util import _start_ignite, start_ignite_gen
class BoolParser(argparse.Action):
@@ -134,12 +134,6 @@ def cache(client):
cache_destroy(conn, cache_name)
-@pytest.fixture(autouse=True)
-def log_init():
- # Init log call timestamp
- get_request_grid_idx()
-
-
@pytest.fixture(scope='module')
def start_client(use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers,
ssl_version,username, password):
diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py
index eb46ab6..cd0c015 100644
--- a/tests/test_affinity_request_routing.py
+++ b/tests/test_affinity_request_routing.py
@@ -13,18 +13,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import OrderedDict
-
+from collections import OrderedDict, deque
import pytest
from pyignite import *
+from pyignite.connection import Connection
from pyignite.datatypes import *
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *
from tests.util import *
-@pytest.mark.parametrize("key,grid_idx", [(1, 3), (2, 1), (3, 1), (4, 3), (5, 1), (6, 3), (11, 2), (13, 2), (19, 2)])
+requests = deque()
+old_send = Connection.send
+
+
+def patched_send(self, *args, **kwargs):
+ """Patched send function that push to queue idx of server to which request is routed."""
+ requests.append(self.port % 100)
+ return old_send(self, *args, **kwargs)
+
+
+def setup_function():
+ requests.clear()
+ Connection.send = patched_send
+
+
+def teardown_function():
+ Connection.send = old_send
+
+
+def wait_for_affinity_distribution(cache, key, node_idx, timeout=30):
+ real_node_idx = 0
+
+ def check_grid_idx():
+ nonlocal real_node_idx
+ try:
+ cache.get(key)
+ real_node_idx = requests.pop()
+ except (OSError, IOError):
+ return False
+ return real_node_idx == node_idx
+
+ res = wait_for_condition(check_grid_idx, timeout=timeout)
+
+ if not res:
+ raise TimeoutError(f"failed to wait for affinity distribution, expected node_idx {node_idx},"
+ f"got {real_node_idx} instead")
+
+
+@pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)])
@pytest.mark.parametrize("backups", [0, 1, 2, 3])
def test_cache_operation_on_primitive_key_routes_request_to_primary_node(
request, key, grid_idx, backups, client_partition_aware):
@@ -34,52 +72,51 @@ def test_cache_operation_on_primitive_key_routes_request_to_primary_node(
PROP_BACKUPS_NUMBER: backups,
})
- # Warm up affinity map
cache.put(key, key)
- get_request_grid_idx()
+ wait_for_affinity_distribution(cache, key, grid_idx)
# Test
cache.get(key)
- assert get_request_grid_idx() == grid_idx
+ assert requests.pop() == grid_idx
cache.put(key, key)
- assert get_request_grid_idx("Put") == grid_idx
+ assert requests.pop() == grid_idx
cache.replace(key, key + 1)
- assert get_request_grid_idx("Replace") == grid_idx
+ assert requests.pop() == grid_idx
cache.clear_key(key)
- assert get_request_grid_idx("ClearKey") == grid_idx
+ assert requests.pop() == grid_idx
cache.contains_key(key)
- assert get_request_grid_idx("ContainsKey") == grid_idx
+ assert requests.pop() == grid_idx
cache.get_and_put(key, 3)
- assert get_request_grid_idx("GetAndPut") == grid_idx
+ assert requests.pop() == grid_idx
cache.get_and_put_if_absent(key, 4)
- assert get_request_grid_idx("GetAndPutIfAbsent") == grid_idx
+ assert requests.pop() == grid_idx
cache.put_if_absent(key, 5)
- assert get_request_grid_idx("PutIfAbsent") == grid_idx
+ assert requests.pop() == grid_idx
cache.get_and_remove(key)
- assert get_request_grid_idx("GetAndRemove") == grid_idx
+ assert requests.pop() == grid_idx
cache.get_and_replace(key, 6)
- assert get_request_grid_idx("GetAndReplace") == grid_idx
+ assert requests.pop() == grid_idx
cache.remove_key(key)
- assert get_request_grid_idx("RemoveKey") == grid_idx
+ assert requests.pop() == grid_idx
cache.remove_if_equals(key, -1)
- assert get_request_grid_idx("RemoveIfEquals") == grid_idx
+ assert requests.pop() == grid_idx
cache.replace(key, -1)
- assert get_request_grid_idx("Replace") == grid_idx
+ assert requests.pop() == grid_idx
cache.replace_if_equals(key, 10, -10)
- assert get_request_grid_idx("ReplaceIfEquals") == grid_idx
+ assert requests.pop() == grid_idx
@pytest.mark.skip(reason="Custom key objects are not supported yet")
@@ -121,31 +158,28 @@ def test_cache_operation_on_custom_affinity_key_routes_request_to_primary_node(
cache.put(key_obj, 1)
cache.put(key_obj, 2)
- assert get_request_grid_idx("Put") == grid_idx
+ assert requests.pop() == grid_idx
-@pytest.mark.skip("https://issues.apache.org/jira/browse/IGNITE-13967")
def test_cache_operation_routed_to_new_cluster_node(request, start_ignite_server, start_client):
client = start_client(partition_aware=True)
client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802), ("127.0.0.1", 10803), ("127.0.0.1", 10804)])
cache = client.get_or_create_cache(request.node.name)
key = 12
+ wait_for_affinity_distribution(cache, key, 3)
cache.put(key, key)
cache.put(key, key)
- assert get_request_grid_idx("Put") == 3
+ assert requests.pop() == 3
srv = start_ignite_server(4)
try:
# Wait for rebalance and partition map exchange
- def check_grid_idx():
- cache.get(key)
- return get_request_grid_idx() == 4
- wait_for_condition(check_grid_idx)
+ wait_for_affinity_distribution(cache, key, 4)
# Response is correct and comes from the new node
res = cache.get_and_remove(key)
assert res == key
- assert get_request_grid_idx("GetAndRemove") == 4
+ assert requests.pop() == 4
finally:
kill_process_tree(srv.pid)
@@ -167,13 +201,13 @@ def verify_random_node(cache):
key = 1
cache.put(key, key)
- idx1 = get_request_grid_idx("Put")
+ idx1 = requests.pop()
idx2 = idx1
# Try 10 times - random node may end up being the same
for _ in range(1, 10):
cache.put(key, key)
- idx2 = get_request_grid_idx("Put")
+ idx2 = requests.pop()
if idx2 != idx1:
break
assert idx1 != idx2
diff --git a/tests/test_affinity_single_connection.py b/tests/test_affinity_single_connection.py
index c40393c..1943384 100644
--- a/tests/test_affinity_single_connection.py
+++ b/tests/test_affinity_single_connection.py
@@ -13,10 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import pytest
-
-from tests.util import get_request_grid_idx
-
def test_all_cache_operations_with_partition_aware_client_on_single_server(request, client_partition_aware_single_server):
cache = client_partition_aware_single_server.get_or_create_cache(request.node.name)
diff --git a/tests/test_cache_composite_key_class_sql.py b/tests/test_cache_composite_key_class_sql.py
index 2f1705f..989a229 100644
--- a/tests/test_cache_composite_key_class_sql.py
+++ b/tests/test_cache_composite_key_class_sql.py
@@ -111,13 +111,12 @@ def test_python_sql_finds_inserted_value_with_composite_key(client):
def validate_query_result(student_key, student_val, query_result):
- '''
+ """
Compare query result with expected key and value.
- '''
+ """
assert len(query_result) == 2
sql_row = dict(zip(query_result[0], query_result[1]))
- assert sql_row["_KEY"][0] == student_key._buffer
assert sql_row['ID'] == student_key.ID
assert sql_row['DEPT'] == student_key.DEPT
assert sql_row['NAME'] == student_val.NAME
diff --git a/tests/test_sql.py b/tests/test_sql.py
index 15f84ee..c896afb 100644
--- a/tests/test_sql.py
+++ b/tests/test_sql.py
@@ -22,7 +22,9 @@ from pyignite.api import (
)
from pyignite.datatypes.prop_codes import *
from pyignite.exceptions import SQLError
-from pyignite.utils import entity_id, unwrap_binary
+from pyignite.utils import entity_id
+from pyignite.binary import unwrap_binary
+
initial_data = [
('John', 'Doe', 5),
diff --git a/tests/util.py b/tests/util.py
index 1d6acd6..90f0146 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -15,6 +15,8 @@
import glob
import os
+
+import jinja2 as jinja2
import psutil
import re
import signal
@@ -72,22 +74,19 @@ def get_ignite_config_path(use_ssl=False):
if use_ssl:
file_name = "ignite-config-ssl.xml"
else:
- file_name = "ignite-config.xml"
+ file_name = "ignite-config.xml.jinja2"
return os.path.join(get_test_dir(), "config", file_name)
def check_server_started(idx=1):
- log_file = os.path.join(get_test_dir(), "logs", f"ignite-log-{idx}.txt")
- if not os.path.exists(log_file):
- return False
-
pattern = re.compile('^Topology snapshot.*')
- with open(log_file) as f:
- for line in f.readlines():
- if pattern.match(line):
- return True
+ for log_file in get_log_files(idx):
+ with open(log_file) as f:
+ for line in f.readlines():
+ if pattern.match(line):
+ return True
return False
@@ -102,20 +101,33 @@ def kill_process_tree(pid):
os.kill(pid, signal.SIGKILL)
+templateLoader = jinja2.FileSystemLoader(searchpath=os.path.join(get_test_dir(), "config"))
+templateEnv = jinja2.Environment(loader=templateLoader)
+
+
+def create_config_file(tpl_name, file_name, **kwargs):
+ template = templateEnv.get_template(tpl_name)
+ with open(os.path.join(get_test_dir(), "config", file_name), mode='w') as f:
+ f.write(template.render(**kwargs))
+
+
def _start_ignite(idx=1, debug=False, use_ssl=False):
clear_logs(idx)
runner = get_ignite_runner()
env = os.environ.copy()
- env['IGNITE_INSTANCE_INDEX'] = str(idx)
- env['IGNITE_CLIENT_PORT'] = str(10800 + idx)
if debug:
env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
- ignite_cmd = [runner, get_ignite_config_path(use_ssl)]
+ params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl}
+
+ create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params)
+ create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params)
+
+ ignite_cmd = [runner, os.path.join(get_test_dir(), "config", f'ignite-config-{idx}.xml')]
print("Starting Ignite server node:", ignite_cmd)
srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
@@ -142,38 +154,3 @@ def get_log_files(idx=1):
def clear_logs(idx=1):
for f in get_log_files(idx):
os.remove(f)
-
-
-def read_log_file(file, idx):
- i = -1
- with open(file) as f:
- lines = f.readlines()
- for line in lines:
- i += 1
-
- if i < read_log_file.last_line[idx]:
- continue
-
- if i > read_log_file.last_line[idx]:
- read_log_file.last_line[idx] = i
-
- # Example: Client request received [reqId=1, addr=/127.0.0.1:51694,
- # req=org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest@1f33101e]
- res = re.match("Client request received .*?req=org.apache.ignite.internal.processors."
- "platform.client.cache.ClientCache([a-zA-Z]+)Request@", line)
-
- if res is not None:
- yield res.group(1)
-
-
-def get_request_grid_idx(message="Get"):
- res = -1
- for i in range(1, 5):
- for log_file in get_log_files(i):
- for log in read_log_file(log_file, i):
- if log == message:
- res = i # Do not exit early to advance all log positions
- return res
-
-
-read_log_file.last_line = [0, 0, 0, 0, 0]
\ No newline at end of file
diff --git a/tox.ini b/tox.ini
index 69db226..4361413 100644
--- a/tox.ini
+++ b/tox.ini
@@ -34,6 +34,10 @@ usedevelop = True
commands =
pytest {env:PYTESTARGS:} {posargs}
+[jenkins]
+setenv:
+ PYTESTARGS = --junitxml=junit-{envname}.xml
+
[no-ssl]
setenv:
PYTEST_ADDOPTS = --examples
@@ -54,3 +58,18 @@ setenv: {[ssl]setenv}
[testenv:py{36,37,38}-ssl-password]
setenv: {[ssl-password]setenv}
+
+[testenv:py{36,37,38}-jenkins-no-ssl]
+setenv:
+ {[no-ssl]setenv}
+ {[jenkins]setenv}
+
+[testenv:py{36,37,38}-jenkins-ssl]
+setenv:
+ {[ssl]setenv}
+ {[jenkins]setenv}
+
+[testenv:py{36,37,38}-jenkins-ssl-password]
+setenv:
+ {[ssl-password]setenv}
+ {[jenkins]setenv}