You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/02/08 14:12:50 UTC

[ignite-python-thin-client] branch master updated: IGNITE-13967: Optimizations and refactoring of parsing

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ead7b9  IGNITE-13967: Optimizations and refactoring of parsing
2ead7b9 is described below

commit 2ead7b928c82dd212273789a275b717ecafca295
Author: Ivan Dashchinskiy <iv...@gmail.com>
AuthorDate: Mon Feb 8 17:11:05 2021 +0300

    IGNITE-13967: Optimizations and refactoring of parsing
    
    This closes #10
---
 .gitignore                                         |   4 +-
 pyignite/api/affinity.py                           |   8 +-
 pyignite/api/binary.py                             |  70 ++++---
 pyignite/binary.py                                 |  47 +++--
 pyignite/cache.py                                  |   4 +-
 pyignite/connection/__init__.py                    | 216 ++++++++-----------
 pyignite/connection/handshake.py                   |   5 +-
 pyignite/datatypes/__init__.py                     |  19 ++
 pyignite/datatypes/cache_properties.py             |  18 +-
 pyignite/datatypes/complex.py                      | 229 ++++++---------------
 pyignite/datatypes/internal.py                     |  99 +++++----
 pyignite/datatypes/null_object.py                  |  55 ++++-
 pyignite/datatypes/primitive.py                    |  43 ++--
 pyignite/datatypes/primitive_arrays.py             |  61 +++---
 pyignite/datatypes/primitive_objects.py            |  32 ++-
 pyignite/datatypes/standard.py                     | 210 +++++++------------
 pyignite/queries/query.py                          |  40 ++--
 pyignite/queries/response.py                       |  49 +++--
 pyignite/{datatypes => stream}/__init__.py         |  13 +-
 pyignite/stream/binary_stream.py                   | 111 ++++++++++
 pyignite/utils.py                                  |  26 +--
 requirements/tests.txt                             |   1 +
 tests/config/ignite-config-ssl.xml                 |  51 -----
 tests/config/ignite-config.xml                     |  39 ----
 ...te-config-base.xml => ignite-config.xml.jinja2} |  39 +++-
 tests/config/{log4j.xml => log4j.xml.jinja2}       |   4 +-
 tests/conftest.py                                  |   8 +-
 tests/test_affinity_request_routing.py             |  92 ++++++---
 tests/test_affinity_single_connection.py           |   4 -
 tests/test_cache_composite_key_class_sql.py        |   5 +-
 tests/test_sql.py                                  |   4 +-
 tests/util.py                                      |  71 +++----
 tox.ini                                            |  19 ++
 33 files changed, 787 insertions(+), 909 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7372921..d28510c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,8 @@
 .eggs
 .pytest_cache
 .tox
+tests/config/*.xml
+junit*.xml
 pyignite.egg-info
 ignite-log-*
-__pycache__
\ No newline at end of file
+__pycache__
diff --git a/pyignite/api/affinity.py b/pyignite/api/affinity.py
index d28cfb8..16148a1 100644
--- a/pyignite/api/affinity.py
+++ b/pyignite/api/affinity.py
@@ -55,12 +55,12 @@ empty_node_mapping = Struct([])
 partition_mapping = StructArray([
     ('is_applicable', Bool),
 
-    ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
-                                  lambda ctx: ctx['is_applicable'] is True,
+    ('cache_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+                                  lambda ctx: ctx['is_applicable'],
                                   cache_mapping, empty_cache_mapping)),
 
-    ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] == b'\x01',
-                                 lambda ctx: ctx['is_applicable'] is True,
+    ('node_mapping', Conditional(lambda ctx: ctx['is_applicable'] and ctx['is_applicable'].value == 1,
+                                 lambda ctx: ctx['is_applicable'],
                                  node_mapping, empty_node_mapping)),
 ])
 
diff --git a/pyignite/api/binary.py b/pyignite/api/binary.py
index 722001a..0e63c17 100644
--- a/pyignite/api/binary.py
+++ b/pyignite/api/binary.py
@@ -24,16 +24,15 @@ from pyignite.queries import Query
 from pyignite.queries.op_codes import *
 from pyignite.utils import int_overflow, entity_id
 from .result import APIResult
+from ..stream import BinaryStream, READ_BACKWARD
 from ..queries.response import Response
 
 
-def get_binary_type(
-    connection: 'Connection', binary_type: Union[str, int], query_id=None,
-) -> APIResult:
+def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
     """
     Gets the binary type information by type ID.
 
-    :param connection: connection to Ignite server,
+    :param conn: connection to Ignite server,
     :param binary_type: binary type name or ID,
     :param query_id: (optional) a value generated by client and returned as-is
      in response.query_id. When the parameter is omitted, a random value
@@ -49,39 +48,42 @@ def get_binary_type(
         query_id=query_id,
     )
 
-    _, send_buffer = query_struct.from_python({
-        'type_id': entity_id(binary_type),
-    })
-    connection.send(send_buffer)
+    with BinaryStream(conn) as stream:
+        query_struct.from_python(stream, {
+            'type_id': entity_id(binary_type),
+        })
+        conn.send(stream.getbuffer())
 
-    response_head_struct = Response(protocol_version=connection.get_protocol_version(),
+    response_head_struct = Response(protocol_version=conn.get_protocol_version(),
                                     following=[('type_exists', Bool)])
 
-    response_head_type, recv_buffer = response_head_struct.parse(connection)
-    response_head = response_head_type.from_buffer_copy(recv_buffer)
-    response_parts = []
-    if response_head.type_exists:
-        resp_body_type, resp_body_buffer = body_struct.parse(connection)
-        response_parts.append(('body', resp_body_type))
-        resp_body = resp_body_type.from_buffer_copy(resp_body_buffer)
-        recv_buffer += resp_body_buffer
-        if resp_body.is_enum:
-            resp_enum, resp_enum_buffer = enum_struct.parse(connection)
-            response_parts.append(('enums', resp_enum))
-            recv_buffer += resp_enum_buffer
-        resp_schema_type, resp_schema_buffer = schema_struct.parse(connection)
-        response_parts.append(('schema', resp_schema_type))
-        recv_buffer += resp_schema_buffer
-
-    response_class = type(
-        'GetBinaryTypeResponse',
-        (response_head_type,),
-        {
-            '_pack_': 1,
-            '_fields_': response_parts,
-        }
-    )
-    response = response_class.from_buffer_copy(recv_buffer)
+    with BinaryStream(conn, conn.recv()) as stream:
+        init_pos = stream.tell()
+        response_head_type = response_head_struct.parse(stream)
+        response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD)
+
+        response_parts = []
+        if response_head.type_exists:
+            resp_body_type = body_struct.parse(stream)
+            response_parts.append(('body', resp_body_type))
+            resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD)
+            if resp_body.is_enum:
+                resp_enum = enum_struct.parse(stream)
+                response_parts.append(('enums', resp_enum))
+
+            resp_schema_type = schema_struct.parse(stream)
+            response_parts.append(('schema', resp_schema_type))
+
+        response_class = type(
+            'GetBinaryTypeResponse',
+            (response_head_type,),
+            {
+                '_pack_': 1,
+                '_fields_': response_parts,
+            }
+        )
+        response = stream.read_ctype(response_class, position=init_pos)
+
     result = APIResult(response)
     if result.status != 0:
         return result
diff --git a/pyignite/binary.py b/pyignite/binary.py
index 5d76c1b..da62bb5 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -102,18 +102,17 @@ class GenericObjectMeta(GenericObjectPropsMeta):
             mcs, name, (GenericObjectProps, )+base_classes, namespace
         )
 
-        def _build(self, client: 'Client' = None) -> int:
+        def _from_python(self, stream, save_to_buf=False):
             """
             Method for building binary representation of the Generic object
             and calculating a hashcode from it.
 
             :param self: Generic object instance,
-            :param client: (optional) connection to Ignite cluster,
+            :param stream: BinaryStream
+            :param save_to_buf: Optional. If True, save serialized data to buffer.
             """
-            if client is None:
-                compact_footer = True
-            else:
-                compact_footer = client.compact_footer
+
+            compact_footer = stream.compact_footer
 
             # prepare header
             header_class = BinaryObject.build_header()
@@ -129,18 +128,19 @@ class GenericObjectMeta(GenericObjectPropsMeta):
             header.type_id = self.type_id
             header.schema_id = self.schema_id
 
+            header_len = ctypes.sizeof(header_class)
+            initial_pos = stream.tell()
+
             # create fields and calculate offsets
             offsets = [ctypes.sizeof(header_class)]
-            field_buffer = bytearray()
             schema_items = list(self.schema.items())
+
+            stream.seek(initial_pos + header_len)
             for field_name, field_type in schema_items:
-                partial_buffer = field_type.from_python(
-                    getattr(
-                        self, field_name, getattr(field_type, 'default', None)
-                    )
-                )
-                offsets.append(max(offsets) + len(partial_buffer))
-                field_buffer += partial_buffer
+                val = getattr(self, field_name, getattr(field_type, 'default', None))
+                field_start_pos = stream.tell()
+                field_type.from_python(stream, val)
+                offsets.append(max(offsets) + stream.tell() - field_start_pos)
 
             offsets = offsets[:-1]
 
@@ -160,15 +160,18 @@ class GenericObjectMeta(GenericObjectPropsMeta):
                     schema[i].offset = offset
 
             # calculate size and hash code
-            header.schema_offset = (
-                ctypes.sizeof(header_class)
-                + len(field_buffer)
-            )
+            fields_data_len = stream.tell() - initial_pos - header_len
+            header.schema_offset = fields_data_len + header_len
             header.length = header.schema_offset + ctypes.sizeof(schema_class)
-            header.hash_code = hashcode(field_buffer)
+            header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len)
+
+            stream.seek(initial_pos)
+            stream.write(header)
+            stream.seek(initial_pos + header.schema_offset)
+            stream.write(schema)
 
-            # reuse the results
-            self._buffer = bytes(header) + field_buffer + bytes(schema)
+            if save_to_buf:
+                self._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
             self._hashcode = header.hash_code
 
         def _setattr(self, attr_name: str, attr_value: Any):
@@ -180,7 +183,7 @@ class GenericObjectMeta(GenericObjectPropsMeta):
             # `super()` is really need these parameters
             super(result, self).__setattr__(attr_name, attr_value)
 
-        setattr(result, _build.__name__, _build)
+        setattr(result, _from_python.__name__, _from_python)
         setattr(result, '__setattr__', _setattr)
         setattr(result, '_buffer', None)
         setattr(result, '_hashcode', None)
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 64093e8..dd7dac4 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -17,7 +17,7 @@ import time
 from typing import Any, Dict, Iterable, Optional, Tuple, Union
 
 from .constants import *
-from .binary import GenericObjectMeta
+from .binary import GenericObjectMeta, unwrap_binary
 from .datatypes import prop_codes
 from .datatypes.internal import AnyDataObject
 from .exceptions import (
@@ -26,7 +26,7 @@ from .exceptions import (
 )
 from .utils import (
     cache_id, get_field_by_id, is_wrapped,
-    status_to_exception, unsigned, unwrap_binary,
+    status_to_exception, unsigned
 )
 from .api.cache_config import (
     cache_create, cache_create_with_config,
diff --git a/pyignite/connection/__init__.py b/pyignite/connection/__init__.py
index cf40718..0e793f8 100644
--- a/pyignite/connection/__init__.py
+++ b/pyignite/connection/__init__.py
@@ -35,7 +35,7 @@ as well as Ignite protocol handshaking.
 
 from collections import OrderedDict
 import socket
-from threading import Lock
+from threading import RLock
 from typing import Union
 
 from pyignite.constants import *
@@ -52,6 +52,8 @@ from .ssl import wrap
 
 __all__ = ['Connection']
 
+from ..stream import BinaryStream, READ_BACKWARD
+
 
 class Connection:
     """
@@ -60,8 +62,7 @@ class Connection:
 
      * socket wrapper. Detects fragmentation and network errors. See also
        https://docs.python.org/3/howto/sockets.html,
-     * binary protocol connector. Incapsulates handshake, data read-ahead and
-       failover reconnection.
+     * binary protocol connector. Incapsulates handshake and failover reconnection.
     """
 
     _socket = None
@@ -72,7 +73,6 @@ class Connection:
     host = None
     port = None
     timeout = None
-    prefetch = None
     username = None
     password = None
     ssl_params = {}
@@ -97,7 +97,7 @@ class Connection:
                 ).format(param))
 
     def __init__(
-        self, client: 'Client', prefetch: bytes = b'', timeout: int = None,
+        self, client: 'Client', timeout: float = 2.0,
         username: str = None, password: str = None, **ssl_params
     ):
         """
@@ -107,8 +107,6 @@ class Connection:
         https://docs.python.org/3/library/ssl.html#ssl-certificates.
 
         :param client: Ignite client object,
-        :param prefetch: (optional) initialize the read-ahead data buffer.
-         Empty by default,
         :param timeout: (optional) sets timeout (in seconds) for each socket
          operation including `connect`. 0 means non-blocking mode, which is
          virtually guaranteed to fail. Can accept integer or float value.
@@ -143,7 +141,6 @@ class Connection:
         :param password: (optional) password to authenticate to Ignite cluster.
         """
         self.client = client
-        self.prefetch = prefetch
         self.timeout = timeout
         self.username = username
         self.password = password
@@ -152,7 +149,8 @@ class Connection:
             ssl_params['use_ssl'] = True
         self.ssl_params = ssl_params
         self._failed = False
-        self._in_use = Lock()
+        self._mux = RLock()
+        self._in_use = False
 
     @property
     def socket(self) -> socket.socket:
@@ -162,17 +160,20 @@ class Connection:
     @property
     def closed(self) -> bool:
         """ Tells if socket is closed. """
-        return self._socket is None
+        with self._mux:
+            return self._socket is None
 
     @property
     def failed(self) -> bool:
         """ Tells if connection is failed. """
-        return self._failed
+        with self._mux:
+            return self._failed
 
     @property
     def alive(self) -> bool:
         """ Tells if connection is up and no failure detected. """
-        return not (self._failed or self.closed)
+        with self._mux:
+            return not (self._failed or self.closed)
 
     def __repr__(self) -> str:
         return '{}:{}'.format(self.host or '?', self.port or '?')
@@ -189,8 +190,10 @@ class Connection:
 
     def _fail(self):
         """ set client to failed state. """
-        self._failed = True
-        self._in_use.release()
+        with self._mux:
+            self._failed = True
+
+            self._in_use = False
 
     def read_response(self) -> Union[dict, OrderedDict]:
         """
@@ -202,26 +205,27 @@ class Connection:
             ('length', Int),
             ('op_code', Byte),
         ])
-        start_class, start_buffer = response_start.parse(self)
-        start = start_class.from_buffer_copy(start_buffer)
-        data = response_start.to_python(start)
-        response_end = None
-        if data['op_code'] == 0:
-            response_end = Struct([
-                ('version_major', Short),
-                ('version_minor', Short),
-                ('version_patch', Short),
-                ('message', String),
-            ])
-        elif self.get_protocol_version() >= (1, 4, 0):
-            response_end = Struct([
-                ('node_uuid', UUIDObject),
-            ])
-        if response_end:
-            end_class, end_buffer = response_end.parse(self)
-            end = end_class.from_buffer_copy(end_buffer)
-            data.update(response_end.to_python(end))
-        return data
+        with BinaryStream(self, self.recv()) as stream:
+            start_class = response_start.parse(stream)
+            start = stream.read_ctype(start_class, direction=READ_BACKWARD)
+            data = response_start.to_python(start)
+            response_end = None
+            if data['op_code'] == 0:
+                response_end = Struct([
+                    ('version_major', Short),
+                    ('version_minor', Short),
+                    ('version_patch', Short),
+                    ('message', String),
+                ])
+            elif self.get_protocol_version() >= (1, 4, 0):
+                response_end = Struct([
+                    ('node_uuid', UUIDObject),
+                ])
+            if response_end:
+                end_class = response_end.parse(stream)
+                end = stream.read_ctype(end_class, direction=READ_BACKWARD)
+                data.update(response_end.to_python(end))
+            return data
 
     def connect(
         self, host: str = None, port: int = None
@@ -234,9 +238,10 @@ class Connection:
         """
         detecting_protocol = False
 
-        # go non-blocking for faster reconnect
-        if not self._in_use.acquire(blocking=False):
-            raise ConnectionError('Connection is in use.')
+        with self._mux:
+            if self._in_use:
+                raise ConnectionError('Connection is in use.')
+            self._in_use = True
 
         # choose highest version first
         if self.client.protocol_version is None:
@@ -289,7 +294,11 @@ class Connection:
             self.username,
             self.password
         )
-        self.send(hs_request)
+
+        with BinaryStream(self) as stream:
+            hs_request.from_python(stream)
+            self.send(stream.getbuffer())
+
         hs_response = self.read_response()
         if hs_response['op_code'] == 0:
             # disconnect but keep in use
@@ -345,12 +354,7 @@ class Connection:
         if not self.failed:
             return
 
-        # return connection to initial state regardless of use lock
-        self.close(release=False)
-        try:
-            self._in_use.release()
-        except RuntimeError:
-            pass
+        self.close()
 
         # connect and silence the connection errors
         try:
@@ -370,20 +374,7 @@ class Connection:
         to.host = self.host
         to.port = self.port
 
-    def clone(self, prefetch: bytes = b'') -> 'Connection':
-        """
-        Clones this connection in its current state.
-
-        :return: `Connection` object.
-        """
-        clone = self.__class__(self.client, **self.ssl_params)
-        self._transfer_params(to=clone)
-        if self.alive:
-            clone.connect(self.host, self.port)
-        clone.prefetch = prefetch
-        return clone
-
-    def send(self, data: bytes, flags=None):
+    def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
         """
         Send data down the socket.
 
@@ -396,70 +387,45 @@ class Connection:
         kwargs = {}
         if flags is not None:
             kwargs['flags'] = flags
-        data = bytes(data)
-        total_bytes_sent = 0
-
-        while total_bytes_sent < len(data):
-            try:
-                bytes_sent = self.socket.send(
-                    data[total_bytes_sent:],
-                    **kwargs
-                )
-            except connection_errors:
-                self._fail()
-                self.reconnect()
-                raise
-            if bytes_sent == 0:
-                self._fail()
-                self.reconnect()
-                raise SocketError('Connection broken.')
-            total_bytes_sent += bytes_sent
-
-    def recv(self, buffersize, flags=None) -> bytes:
-        """
-        Receive data from socket or read-ahead buffer.
 
-        :param buffersize: bytes to receive,
-        :param flags: (optional) OS-specific flags,
-        :return: data received.
-        """
+        try:
+            self.socket.sendall(data, **kwargs)
+        except Exception:
+            self._fail()
+            self.reconnect()
+            raise
+
+    def recv(self, flags=None) -> bytearray:
+        def _recv(buffer, num_bytes):
+            bytes_to_receive = num_bytes
+            while bytes_to_receive > 0:
+                try:
+                    bytes_rcvd = self.socket.recv_into(buffer, bytes_to_receive, **kwargs)
+                    if bytes_rcvd == 0:
+                        raise SocketError('Connection broken.')
+                except connection_errors:
+                    self._fail()
+                    self.reconnect()
+                    raise
+
+                buffer = buffer[bytes_rcvd:]
+                bytes_to_receive -= bytes_rcvd
+
         if self.closed:
             raise SocketError('Attempt to use closed connection.')
 
-        pref_size = len(self.prefetch)
-        if buffersize > pref_size:
-            result = self.prefetch
-            self.prefetch = b''
-            try:
-                result += self._recv(buffersize-pref_size, flags)
-            except connection_errors:
-                self._fail()
-                self.reconnect()
-                raise
-            return result
-        else:
-            result = self.prefetch[:buffersize]
-            self.prefetch = self.prefetch[buffersize:]
-            return result
-
-    def _recv(self, buffersize, flags=None) -> bytes:
-        """
-        Handle socket data reading.
-        """
         kwargs = {}
         if flags is not None:
             kwargs['flags'] = flags
-        chunks = []
-        bytes_rcvd = 0
 
-        while bytes_rcvd < buffersize:
-            chunk = self.socket.recv(buffersize-bytes_rcvd, **kwargs)
-            if chunk == b'':
-                raise SocketError('Connection broken.')
-            chunks.append(chunk)
-            bytes_rcvd += len(chunk)
+        data = bytearray(4)
+        _recv(memoryview(data), 4)
+        response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
+
+        data.extend(bytearray(response_len))
+        _recv(memoryview(data)[4:], response_len)
+        return data
 
-        return b''.join(chunks)
 
     def close(self, release=True):
         """
@@ -467,16 +433,14 @@ class Connection:
         not required, since sockets are automatically closed when
         garbage-collected.
         """
-        if release:
-            try:
-                self._in_use.release()
-            except RuntimeError:
-                pass
-
-        if self._socket:
-            try:
-                self._socket.shutdown(socket.SHUT_RDWR)
-                self._socket.close()
-            except connection_errors:
-                pass
-            self._socket = None
+        with self._mux:
+            if self._socket:
+                try:
+                    self._socket.shutdown(socket.SHUT_RDWR)
+                    self._socket.close()
+                except connection_errors:
+                    pass
+                self._socket = None
+
+            if release:
+                self._in_use = False
diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py
index 2e0264f..3315c4e 100644
--- a/pyignite/connection/handshake.py
+++ b/pyignite/connection/handshake.py
@@ -50,7 +50,7 @@ class HandshakeRequest:
             ])
         self.handshake_struct = Struct(fields)
 
-    def __bytes__(self) -> bytes:
+    def from_python(self, stream):
         handshake_data = {
             'length': 8,
             'op_code': OP_HANDSHAKE,
@@ -69,4 +69,5 @@ class HandshakeRequest:
                 len(self.username),
                 len(self.password),
             ])
-        return self.handshake_struct.from_python(handshake_data)
+
+        self.handshake_struct.from_python(stream, handshake_data)
diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py
index 5024f79..49860bd 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/datatypes/__init__.py
@@ -25,3 +25,22 @@ from .primitive import *
 from .primitive_arrays import *
 from .primitive_objects import *
 from .standard import *
+from ..stream import BinaryStream, READ_BACKWARD
+
+
+def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
+    """
+    Unwrap wrapped BinaryObject and convert it to Python data.
+
+    :param client: connection to Ignite cluster,
+    :param wrapped: `WrappedDataObject` value,
+    :return: dict representing wrapped BinaryObject.
+    """
+    from pyignite.datatypes.complex import BinaryObject
+
+    blob, offset = wrapped
+    with BinaryStream(client.random_node, blob) as stream:
+        data_class = BinaryObject.parse(stream)
+        result = BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client)
+
+    return result
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index e94db5f..eadaef9 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -92,10 +92,11 @@ class PropBase:
         )
 
     @classmethod
-    def parse(cls, connection: 'Connection'):
+    def parse(cls, stream):
+        init_pos = stream.tell()
         header_class = cls.build_header()
-        header_buffer = connection.recv(ctypes.sizeof(header_class))
-        data_class, data_buffer = cls.prop_data_class.parse(connection)
+        data_class = cls.prop_data_class.parse(stream)
+
         prop_class = type(
             cls.__name__,
             (header_class,),
@@ -106,7 +107,9 @@ class PropBase:
                 ],
             }
         )
-        return prop_class, header_buffer + data_buffer
+
+        stream.seek(init_pos + ctypes.sizeof(prop_class))
+        return prop_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -115,11 +118,12 @@ class PropBase:
         )
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         header_class = cls.build_header()
         header = header_class()
         header.prop_code = cls.prop_code
-        return bytes(header) + cls.prop_data_class.from_python(value)
+        stream.write(bytes(header))
+        cls.prop_data_class.from_python(stream, value)
 
 
 class PropName(PropBase):
@@ -275,7 +279,7 @@ class PropStatisticsEnabled(PropBase):
 class AnyProperty(PropBase):
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         raise Exception(
             'You must choose a certain type '
             'for your cache configuration property'
diff --git a/pyignite/datatypes/complex.py b/pyignite/datatypes/complex.py
index 6860583..aed3cda 100644
--- a/pyignite/datatypes/complex.py
+++ b/pyignite/datatypes/complex.py
@@ -15,27 +15,27 @@
 
 from collections import OrderedDict
 import ctypes
-import inspect
+from io import SEEK_CUR
 from typing import Iterable, Dict
 
 from pyignite.constants import *
 from pyignite.exceptions import ParseError
-
 from .base import IgniteDataType
 from .internal import AnyDataObject, infer_from_python
 from .type_codes import *
 from .type_ids import *
 from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
 
 __all__ = [
     'Map', 'ObjectArrayObject', 'CollectionObject', 'MapObject',
     'WrappedDataObject', 'BinaryObject',
 ]
 
+from ..stream import BinaryStream
+
 
-class ObjectArrayObject(IgniteDataType):
+class ObjectArrayObject(IgniteDataType, Nullable):
     """
     Array of Ignite objects of any consistent type. Its Python representation
     is tuple(type_id, iterable of any type). The only type ID that makes sense
@@ -69,20 +69,14 @@ class ObjectArrayObject(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
-        fields = []
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
 
+        fields = []
         for i in range(header.length):
-            c_type, buffer_fragment = AnyDataObject.parse(client)
-            buffer += buffer_fragment
+            c_type = AnyDataObject.parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         final_class = type(
@@ -93,15 +87,13 @@ class ObjectArrayObject(IgniteDataType):
                 '_fields_': fields,
             }
         )
-        return final_class, buffer
+
+        return final_class
 
     @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
+    def to_python_not_null(cls, ctype_object, *args, **kwargs):
         result = []
-        length = getattr(ctype_object, "length", None)
-        if length is None:
-            return None
-        for i in range(length):
+        for i in range(ctype_object.length):
             result.append(
                 AnyDataObject.to_python(
                     getattr(ctype_object, 'element_{}'.format(i)),
@@ -111,10 +103,7 @@ class ObjectArrayObject(IgniteDataType):
         return ctype_object.type_id, result
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value):
         type_or_id, value = value
         header_class = cls.build_header()
         header = header_class()
@@ -129,14 +118,13 @@ class ObjectArrayObject(IgniteDataType):
             length = 1
         header.length = length
         header.type_id = type_or_id
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += infer_from_python(x)
-        return bytes(buffer)
+            infer_from_python(stream, x)
 
 
-class WrappedDataObject(IgniteDataType):
+class WrappedDataObject(IgniteDataType, Nullable):
     """
     One or more binary objects can be wrapped in an array. This allows reading,
     storing, passing and writing objects efficiently without understanding
@@ -162,15 +150,9 @@ class WrappedDataObject(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
+        header = stream.read_ctype(header_class)
 
         final_class = type(
             cls.__name__,
@@ -183,21 +165,20 @@ class WrappedDataObject(IgniteDataType):
                 ],
             }
         )
-        buffer += client.recv(
-            ctypes.sizeof(final_class) - ctypes.sizeof(header_class)
-        )
-        return final_class, buffer
+
+        stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
         return bytes(ctype_object.payload), ctype_object.offset
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         raise ParseError('Send unwrapped data.')
 
 
-class CollectionObject(IgniteDataType):
+class CollectionObject(IgniteDataType, Nullable):
     """
     Similar to object array, but contains platform-agnostic deserialization
     type hint instead of type ID.
@@ -260,20 +241,14 @@ class CollectionObject(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
-        fields = []
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
 
+        fields = []
         for i in range(header.length):
-            c_type, buffer_fragment = AnyDataObject.parse(client)
-            buffer += buffer_fragment
+            c_type = AnyDataObject.parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         final_class = type(
@@ -284,7 +259,7 @@ class CollectionObject(IgniteDataType):
                 '_fields_': fields,
             }
         )
-        return final_class, buffer
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -302,10 +277,7 @@ class CollectionObject(IgniteDataType):
         return ctype_object.type, result
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value):
         type_or_id, value = value
         header_class = cls.build_header()
         header = header_class()
@@ -320,14 +292,13 @@ class CollectionObject(IgniteDataType):
             length = 1
         header.length = length
         header.type = type_or_id
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += infer_from_python(x)
-        return bytes(buffer)
+            infer_from_python(stream, x)
 
 
-class Map(IgniteDataType):
+class Map(IgniteDataType, Nullable):
     """
     Dictionary type, payload-only.
 
@@ -358,20 +329,14 @@ class Map(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
-        fields = []
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
 
+        fields = []
         for i in range(header.length << 1):
-            c_type, buffer_fragment = AnyDataObject.parse(client)
-            buffer += buffer_fragment
+            c_type = AnyDataObject.parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         final_class = type(
@@ -382,7 +347,7 @@ class Map(IgniteDataType):
                 '_fields_': fields,
             }
         )
-        return final_class, buffer
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -402,7 +367,7 @@ class Map(IgniteDataType):
         return result
 
     @classmethod
-    def from_python(cls, value, type_id=None):
+    def from_python(cls, stream, value, type_id=None):
         header_class = cls.build_header()
         header = header_class()
         length = len(value)
@@ -414,12 +379,11 @@ class Map(IgniteDataType):
             )
         if hasattr(header, 'type'):
             header.type = type_id
-        buffer = bytearray(header)
 
+        stream.write(header)
         for k, v in value.items():
-            buffer += infer_from_python(k)
-            buffer += infer_from_python(v)
-        return bytes(buffer)
+            infer_from_python(stream, k)
+            infer_from_python(stream, v)
 
 
 class MapObject(Map):
@@ -462,15 +426,16 @@ class MapObject(Map):
         )
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         if value is None:
-            return Null.from_python()
+            Null.from_python(stream)
+            return
 
         type_id, value = value
-        return super().from_python(value, type_id)
+        super().from_python(stream, value, type_id)
 
 
-class BinaryObject(IgniteDataType):
+class BinaryObject(IgniteDataType, Nullable):
     _type_id = TYPE_BINARY_OBJ
     type_code = TC_COMPLEX_OBJECT
 
@@ -482,42 +447,14 @@ class BinaryObject(IgniteDataType):
     COMPACT_FOOTER = 0x0020
 
     @staticmethod
-    def find_client():
-        """
-        A nice hack. Extracts the nearest `Client` instance from the
-        call stack.
-        """
-        from pyignite import Client
-        from pyignite.connection import Connection
-
-        frame = None
-        try:
-            for rec in inspect.stack()[2:]:
-                frame = rec[0]
-                code = frame.f_code
-                for varname in code.co_varnames:
-                    suspect = frame.f_locals.get(varname)
-                    if isinstance(suspect, Client):
-                        return suspect
-                    if isinstance(suspect, Connection):
-                        return suspect.client
-        finally:
-            del frame
-
-    @staticmethod
-    def hashcode(
-        value: object, client: 'Client' = None, *args, **kwargs
-    ) -> int:
+    def hashcode(value: object, client: None) -> int:
         # binary objects's hashcode implementation is special in the sense
         # that you need to fully serialize the object to calculate
         # its hashcode
-        if value._hashcode is None:
+        if not value._hashcode and client :
 
-            # …and for to serialize it you need a Client instance
-            if client is None:
-                client = BinaryObject.find_client()
-
-            value._build(client)
+            with BinaryStream(client.random_node) as stream:
+                value._from_python(stream, save_to_buf=True)
 
         return value._hashcode
 
@@ -565,41 +502,25 @@ class BinaryObject(IgniteDataType):
             },
         )
 
-    @staticmethod
-    def get_dataclass(conn: 'Connection', header) -> OrderedDict:
-        # get field names from outer space
-        result = conn.client.query_binary_type(
-            header.type_id,
-            header.schema_id
-        )
-        if not result:
-            raise ParseError('Binary type is not registered')
-        return result
-
     @classmethod
-    def parse(cls, client: 'Client'):
+    def parse_not_null(cls, stream):
         from pyignite.datatypes import Struct
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
 
         header_class = cls.build_header()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
 
         # ignore full schema, always retrieve fields' types and order
         # from complex types registry
-        data_class = cls.get_dataclass(client, header)
+        data_class = stream.get_dataclass(header)
         fields = data_class.schema.items()
         object_fields_struct = Struct(fields)
-        object_fields, object_fields_buffer = object_fields_struct.parse(client)
-        buffer += object_fields_buffer
+        object_fields = object_fields_struct.parse(stream)
         final_class_fields = [('object_fields', object_fields)]
 
         if header.flags & cls.HAS_SCHEMA:
             schema = cls.schema_type(header.flags) * len(fields)
-            buffer += client.recv(ctypes.sizeof(schema))
+            stream.seek(ctypes.sizeof(schema), SEEK_CUR)
             final_class_fields.append(('schema', schema))
 
         final_class = type(
@@ -611,8 +532,8 @@ class BinaryObject(IgniteDataType):
             }
         )
         # register schema encoding approach
-        client.compact_footer = bool(header.flags & cls.COMPACT_FOOTER)
-        return final_class, buffer
+        stream.compact_footer = bool(header.flags & cls.COMPACT_FOOTER)
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, client: 'Client' = None, *args, **kwargs):
@@ -642,23 +563,9 @@ class BinaryObject(IgniteDataType):
         return result
 
     @classmethod
-    def from_python(cls, value: object):
-        if value is None:
-            return Null.from_python()
-
-        if getattr(value, '_buffer', None) is None:
-            client = cls.find_client()
-
-            # if no client can be found, the class of the `value` is discarded
-            # and the new dataclass is automatically registered later on
-            if client:
-                client.register_binary_type(value.__class__)
-            else:
-                raise Warning(
-                    'Can not register binary type {}'.format(value.type_name)
-                )
-
-            # build binary representation
-            value._build(client)
-
-        return value._buffer
+    def from_python_not_null(cls, stream, value):
+        stream.register_binary_type(value.__class__)
+        if getattr(value, '_buffer', None):
+            stream.write(value._buffer)
+        else:
+            value._from_python(stream)
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 23b9cc4..0111a22 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -17,6 +17,7 @@ from collections import OrderedDict
 import ctypes
 import decimal
 from datetime import date, datetime, timedelta
+from io import SEEK_CUR
 from typing import Any, Tuple, Union, Callable
 import uuid
 
@@ -33,6 +34,8 @@ __all__ = [
     'infer_from_python',
 ]
 
+from ..stream import READ_BACKWARD
+
 
 def tc_map(key: bytes, _memo_map: dict = {}):
     """
@@ -119,11 +122,12 @@ class Conditional:
         self.var1 = var1
         self.var2 = var2
 
-    def parse(self, client: 'Client', context):
-        return self.var1.parse(client) if self.predicate1(context) else self.var2.parse(client)
+    def parse(self, stream, context):
+        return self.var1.parse(stream) if self.predicate1(context) else self.var2.parse(stream)
 
     def to_python(self, ctype_object, context, *args, **kwargs):
-        return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context) else self.var2.to_python(ctype_object, *args, **kwargs)
+        return self.var1.to_python(ctype_object, *args, **kwargs) if self.predicate2(context)\
+            else self.var2.to_python(ctype_object, *args, **kwargs)
 
 @attr.s
 class StructArray:
@@ -144,14 +148,17 @@ class StructArray:
             },
         )
 
-    def parse(self, client: 'Client'):
-        buffer = client.recv(ctypes.sizeof(self.counter_type))
-        length = int.from_bytes(buffer, byteorder=PROTOCOL_BYTE_ORDER)
-        fields = []
+    def parse(self, stream):
+        counter_type_len = ctypes.sizeof(self.counter_type)
+        length = int.from_bytes(
+            stream.mem_view(offset=counter_type_len),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
+        stream.seek(counter_type_len, SEEK_CUR)
 
+        fields = []
         for i in range(length):
-            c_type, buffer_fragment = Struct(self.following).parse(client)
-            buffer += buffer_fragment
+            c_type = Struct(self.following).parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         data_class = type(
@@ -163,7 +170,7 @@ class StructArray:
             },
         )
 
-        return data_class, buffer
+        return data_class
 
     def to_python(self, ctype_object, *args, **kwargs):
         result = []
@@ -179,20 +186,19 @@ class StructArray:
             )
         return result
 
-    def from_python(self, value):
+    def from_python(self, stream, value):
         length = len(value)
         header_class = self.build_header_class()
         header = header_class()
         header.length = length
-        buffer = bytearray(header)
 
+
+        stream.write(header)
         for i, v in enumerate(value):
             for default_key, default_value in self.defaults.items():
                 v.setdefault(default_key, default_value)
             for name, el_class in self.following:
-                buffer += el_class.from_python(v[name])
-
-        return bytes(buffer)
+                el_class.from_python(stream, v[name])
 
 
 @attr.s
@@ -202,21 +208,13 @@ class Struct:
     dict_type = attr.ib(default=OrderedDict)
     defaults = attr.ib(type=dict, default={})
 
-    def parse(
-        self, client: 'Client'
-    ) -> Tuple[ctypes.LittleEndianStructure, bytes]:
-        buffer = b''
-        fields = []
-        values = {}
-
+    def parse(self, stream):
+        fields, values = [], {}
         for name, c_type in self.fields:
             is_cond = isinstance(c_type, Conditional)
-            c_type, buffer_fragment = c_type.parse(client, values) if is_cond else c_type.parse(client)
-            buffer += buffer_fragment
-
+            c_type = c_type.parse(stream, values) if is_cond else c_type.parse(stream)
             fields.append((name, c_type))
-
-            values[name] = buffer_fragment
+            values[name] = stream.read_ctype(c_type, direction=READ_BACKWARD)
 
         data_class = type(
             'Struct',
@@ -227,7 +225,7 @@ class Struct:
             },
         )
 
-        return data_class, buffer
+        return data_class
 
     def to_python(
         self, ctype_object, *args, **kwargs
@@ -245,16 +243,12 @@ class Struct:
             )
         return result
 
-    def from_python(self, value) -> bytes:
-        buffer = b''
-
+    def from_python(self, stream, value):
         for default_key, default_value in self.defaults.items():
             value.setdefault(default_key, default_value)
 
         for name, el_class in self.fields:
-            buffer += el_class.from_python(value[name])
-
-        return buffer
+            el_class.from_python(stream, value[name])
 
 
 class AnyDataObject:
@@ -299,14 +293,13 @@ class AnyDataObject:
             return type_first
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        type_code = client.recv(ctypes.sizeof(ctypes.c_byte))
+    def parse(cls, stream):
+        type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte)))
         try:
             data_class = tc_map(type_code)
         except KeyError:
             raise ParseError('Unknown type code: `{}`'.format(type_code))
-        client.prefetch += type_code
-        return data_class.parse(client)
+        return data_class.parse(stream)
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -418,11 +411,12 @@ class AnyDataObject:
         )
 
     @classmethod
-    def from_python(cls, value):
-        return cls.map_python_type(value).from_python(value)
+    def from_python(cls, stream, value):
+        p_type = cls.map_python_type(value)
+        p_type.from_python(stream, value)
 
 
-def infer_from_python(value: Any):
+def infer_from_python(stream, value: Any):
     """
     Convert pythonic value to ctypes buffer, type hint-aware.
 
@@ -433,7 +427,8 @@ def infer_from_python(value: Any):
         value, data_type = value
     else:
         data_type = AnyDataObject
-    return data_type.from_python(value)
+
+    data_type.from_python(stream, value)
 
 
 @attr.s
@@ -455,15 +450,14 @@ class AnyDataArray(AnyDataObject):
             }
         )
 
-    def parse(self, client: 'Client'):
+    def parse(self, stream):
         header_class = self.build_header()
-        buffer = client.recv(ctypes.sizeof(header_class))
-        header = header_class.from_buffer_copy(buffer)
-        fields = []
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
 
+        fields = []
         for i in range(header.length):
-            c_type, buffer_fragment = super().parse(client)
-            buffer += buffer_fragment
+            c_type = super().parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         final_class = type(
@@ -474,7 +468,7 @@ class AnyDataArray(AnyDataObject):
                 '_fields_': fields,
             }
         )
-        return final_class, buffer
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -491,7 +485,7 @@ class AnyDataArray(AnyDataObject):
             )
         return result
 
-    def from_python(self, value):
+    def from_python(self, stream, value):
         header_class = self.build_header()
         header = header_class()
 
@@ -501,8 +495,7 @@ class AnyDataArray(AnyDataObject):
             value = [value]
             length = 1
         header.length = length
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += infer_from_python(x)
-        return bytes(buffer)
+            infer_from_python(stream, x)
diff --git a/pyignite/datatypes/null_object.py b/pyignite/datatypes/null_object.py
index 19b41c7..912ded8 100644
--- a/pyignite/datatypes/null_object.py
+++ b/pyignite/datatypes/null_object.py
@@ -20,6 +20,7 @@ There can't be null type, because null payload takes exactly 0 bytes.
 """
 
 import ctypes
+from io import SEEK_CUR
 from typing import Any
 
 from .base import IgniteDataType
@@ -28,6 +29,8 @@ from .type_codes import TC_NULL
 
 __all__ = ['Null']
 
+from ..constants import PROTOCOL_BYTE_ORDER
+
 
 class Null(IgniteDataType):
     default = None
@@ -55,16 +58,56 @@ class Null(IgniteDataType):
         return cls._object_c_type
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        buffer = client.recv(ctypes.sizeof(ctypes.c_byte))
-        data_type = cls.build_c_type()
-        return data_type, buffer
+    def parse(cls, stream):
+        init_pos, offset = stream.tell(), ctypes.sizeof(ctypes.c_byte)
+        stream.seek(offset, SEEK_CUR)
+        return cls.build_c_type()
 
     @staticmethod
     def to_python(*args, **kwargs):
         return None
 
     @staticmethod
-    def from_python(*args):
-        return TC_NULL
+    def from_python(stream, *args):
+        stream.write(TC_NULL)
+
+
+class Nullable:
+    @classmethod
+    def parse_not_null(cls, stream):
+        raise NotImplementedError
+
+    @classmethod
+    def parse(cls, stream):
+        type_len = ctypes.sizeof(ctypes.c_byte)
+
+        if stream.mem_view(offset=type_len) == TC_NULL:
+            stream.seek(type_len, SEEK_CUR)
+            return Null.build_c_type()
+
+        return cls.parse_not_null(stream)
 
+    @classmethod
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
+        raise NotImplementedError
+
+    @classmethod
+    def to_python(cls, ctypes_object, *args, **kwargs):
+        if ctypes_object.type_code == int.from_bytes(
+                TC_NULL,
+                byteorder=PROTOCOL_BYTE_ORDER
+        ):
+            return None
+
+        return cls.to_python_not_null(ctypes_object, *args, **kwargs)
+
+    @classmethod
+    def from_python_not_null(cls, stream, value):
+        raise NotImplementedError
+
+    @classmethod
+    def from_python(cls, stream, value):
+        if value is None:
+            Null.from_python(stream)
+        else:
+            cls.from_python_not_null(stream, value)
diff --git a/pyignite/datatypes/primitive.py b/pyignite/datatypes/primitive.py
index d549fda..ffa2e32 100644
--- a/pyignite/datatypes/primitive.py
+++ b/pyignite/datatypes/primitive.py
@@ -15,7 +15,7 @@
 
 import ctypes
 import struct
-import sys
+from io import SEEK_CUR
 
 from pyignite.constants import *
 from .base import IgniteDataType
@@ -47,8 +47,10 @@ class Primitive(IgniteDataType):
     c_type = None
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        return cls.c_type, client.recv(ctypes.sizeof(cls.c_type))
+    def parse(cls, stream):
+        init_pos, offset = stream.tell(), ctypes.sizeof(cls.c_type)
+        stream.seek(offset, SEEK_CUR)
+        return cls.c_type
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
@@ -61,8 +63,8 @@ class Byte(Primitive):
     c_type = ctypes.c_byte
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<b", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<b", value))
 
 
 class Short(Primitive):
@@ -71,8 +73,8 @@ class Short(Primitive):
     c_type = ctypes.c_short
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<h", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<h", value))
 
 
 class Int(Primitive):
@@ -81,8 +83,8 @@ class Int(Primitive):
     c_type = ctypes.c_int
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<i", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<i", value))
 
 
 class Long(Primitive):
@@ -91,8 +93,8 @@ class Long(Primitive):
     c_type = ctypes.c_longlong
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<q", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<q", value))
 
 
 class Float(Primitive):
@@ -101,8 +103,8 @@ class Float(Primitive):
     c_type = ctypes.c_float
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<f", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<f", value))
 
 
 class Double(Primitive):
@@ -111,8 +113,8 @@ class Double(Primitive):
     c_type = ctypes.c_double
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<d", value)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<d", value))
 
 
 class Char(Primitive):
@@ -128,16 +130,15 @@ class Char(Primitive):
         ).decode(PROTOCOL_CHAR_ENCODING)
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         if type(value) is str:
             value = value.encode(PROTOCOL_CHAR_ENCODING)
         # assuming either a bytes or an integer
         if type(value) is bytes:
             value = int.from_bytes(value, byteorder=PROTOCOL_BYTE_ORDER)
         # assuming a valid integer
-        return value.to_bytes(
-            ctypes.sizeof(cls.c_type),
-            byteorder=PROTOCOL_BYTE_ORDER
+        stream.write(
+            value.to_bytes(ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER)
         )
 
 
@@ -151,5 +152,5 @@ class Bool(Primitive):
         return ctype_object != 0
 
     @classmethod
-    def from_python(cls, value):
-        return struct.pack("<b", 1 if value else 0)
+    def from_python(cls, stream, value):
+        stream.write(struct.pack("<b", 1 if value else 0))
diff --git a/pyignite/datatypes/primitive_arrays.py b/pyignite/datatypes/primitive_arrays.py
index 1b41728..7cb5b20 100644
--- a/pyignite/datatypes/primitive_arrays.py
+++ b/pyignite/datatypes/primitive_arrays.py
@@ -14,11 +14,13 @@
 # limitations under the License.
 
 import ctypes
+from io import SEEK_CUR
 from typing import Any
 
 from pyignite.constants import *
 from . import Null
 from .base import IgniteDataType
+from .null_object import Nullable
 from .primitive import *
 from .type_codes import *
 from .type_ids import *
@@ -33,7 +35,7 @@ __all__ = [
 ]
 
 
-class PrimitiveArray(IgniteDataType):
+class PrimitiveArray(IgniteDataType, Nullable):
     """
     Base class for array of primitives. Payload-only.
     """
@@ -61,15 +63,10 @@ class PrimitiveArray(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header_class()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
+        header = stream.read_ctype(header_class)
+
         final_class = type(
             cls.__name__,
             (header_class,),
@@ -80,26 +77,18 @@ class PrimitiveArray(IgniteDataType):
                 ],
             }
         )
-        buffer += client.recv(
-            ctypes.sizeof(final_class) - ctypes.sizeof(header_class)
-        )
-        return final_class, buffer
+        stream.seek(ctypes.sizeof(final_class), SEEK_CUR)
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
-        result = []
         length = getattr(ctype_object, "length", None)
         if length is None:
             return None
-        for i in range(length):
-            result.append(ctype_object.data[i])
-        return result
+        return [ctype_object.data[i] for i in range(ctype_object.length)]
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value):
         header_class = cls.build_header_class()
         header = header_class()
         if hasattr(header, 'type_code'):
@@ -109,11 +98,10 @@ class PrimitiveArray(IgniteDataType):
             )
         length = len(value)
         header.length = length
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += cls.primitive_type.from_python(x)
-        return bytes(buffer)
+            cls.primitive_type.from_python(stream, x)
 
 
 class ByteArray(PrimitiveArray):
@@ -130,14 +118,13 @@ class ByteArray(PrimitiveArray):
         return bytearray(data)
 
     @classmethod
-    def from_python(cls, value):
+    def from_python(cls, stream, value):
         header_class = cls.build_header_class()
         header = header_class()
-
-        # no need to iterate on bytes or bytearray
-        # to create ByteArray data buffer
         header.length = len(value)
-        return bytes(bytearray(header) + bytearray(value))
+
+        stream.write(header)
+        stream.write(bytearray(value))
 
 
 class ShortArray(PrimitiveArray):
@@ -224,20 +211,20 @@ class ByteArrayObject(PrimitiveArrayObject):
         return ByteArray.to_python(ctype_object, *args, **kwargs)
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value):
         header_class = cls.build_header_class()
         header = header_class()
         header.type_code = int.from_bytes(
             cls.type_code,
             byteorder=PROTOCOL_BYTE_ORDER
         )
-
-        # no need to iterate on bytes or bytearray
-        # to create ByteArrayObject data buffer
         header.length = len(value)
+        stream.write(header)
+
+        if isinstance(value, (bytes, bytearray)):
+            stream.write(value)
+            return
+
         try:
             # `value` is a `bytearray` or a sequence of integer values
             # in range 0 to 255
@@ -253,7 +240,7 @@ class ByteArrayObject(PrimitiveArrayObject):
                         'byte must be in range(-128, 256)!'
                     ) from None
 
-        return bytes(bytearray(header) + value_buffer)
+        stream.write(value_buffer)
 
 
 class ShortArrayObject(PrimitiveArrayObject):
diff --git a/pyignite/datatypes/primitive_objects.py b/pyignite/datatypes/primitive_objects.py
index 53f12d2..e942dd7 100644
--- a/pyignite/datatypes/primitive_objects.py
+++ b/pyignite/datatypes/primitive_objects.py
@@ -14,16 +14,15 @@
 # limitations under the License.
 
 import ctypes
+from io import SEEK_CUR
 
 from pyignite.constants import *
 from pyignite.utils import unsigned
-
 from .base import IgniteDataType
 from .type_codes import *
 from .type_ids import *
 from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
 
 __all__ = [
     'DataObject', 'ByteObject', 'ShortObject', 'IntObject', 'LongObject',
@@ -31,7 +30,7 @@ __all__ = [
 ]
 
 
-class DataObject(IgniteDataType):
+class DataObject(IgniteDataType, Nullable):
     """
     Base class for primitive data objects.
 
@@ -61,22 +60,17 @@ class DataObject(IgniteDataType):
         return cls._object_c_type
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
+    def parse_not_null(cls, stream):
         data_type = cls.build_c_type()
-        buffer = tc_type + client.recv(ctypes.sizeof(data_type) - len(tc_type))
-        return data_type, buffer
+        stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+        return data_type
 
     @staticmethod
     def to_python(ctype_object, *args, **kwargs):
         return getattr(ctype_object, "value", None)
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -84,7 +78,7 @@ class DataObject(IgniteDataType):
             byteorder=PROTOCOL_BYTE_ORDER
         )
         data_object.value = value
-        return bytes(data_object)
+        stream.write(data_object)
 
 
 class ByteObject(DataObject):
@@ -201,18 +195,16 @@ class CharObject(DataObject):
         ).decode(PROTOCOL_CHAR_ENCODING)
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value):
         if type(value) is str:
             value = value.encode(PROTOCOL_CHAR_ENCODING)
         # assuming either a bytes or an integer
         if type(value) is bytes:
             value = int.from_bytes(value, byteorder=PROTOCOL_BYTE_ORDER)
         # assuming a valid integer
-        return cls.type_code + value.to_bytes(
-            ctypes.sizeof(cls.c_type),
-            byteorder=PROTOCOL_BYTE_ORDER
+        stream.write(cls.type_code)
+        stream.write(
+            value.to_bytes(ctypes.sizeof(cls.c_type), byteorder=PROTOCOL_BYTE_ORDER)
         )
 
 
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 0f16735..af50a8e 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -16,6 +16,7 @@
 import ctypes
 from datetime import date, datetime, time, timedelta
 import decimal
+from io import SEEK_CUR
 from math import ceil
 from typing import Any, Tuple
 import uuid
@@ -26,8 +27,7 @@ from .base import IgniteDataType
 from .type_codes import *
 from .type_ids import *
 from .type_names import *
-from .null_object import Null
-
+from .null_object import Null, Nullable
 
 __all__ = [
     'String', 'DecimalObject', 'UUIDObject', 'TimestampObject', 'DateObject',
@@ -44,7 +44,7 @@ __all__ = [
 ]
 
 
-class StandardObject(IgniteDataType):
+class StandardObject(IgniteDataType, Nullable):
     _type_name = None
     _type_id = None
     type_code = None
@@ -54,18 +54,13 @@ class StandardObject(IgniteDataType):
         raise NotImplementedError('This object is generic')
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
-        c_type = cls.build_c_type()
-        buffer = tc_type + client.recv(ctypes.sizeof(c_type) - len(tc_type))
-        return c_type, buffer
+    def parse_not_null(cls, stream):
+        data_type = cls.build_c_type()
+        stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+        return data_type
 
 
-class String(IgniteDataType):
+class String(IgniteDataType, Nullable):
     """
     Pascal-style string: `c_int` counter, followed by count*bytes.
     UTF-8-encoded, so that one character may take 1 to 4 bytes.
@@ -95,35 +90,25 @@ class String(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-        # String or Null
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
-        buffer = tc_type + client.recv(ctypes.sizeof(ctypes.c_int))
-        length = int.from_bytes(buffer[1:], byteorder=PROTOCOL_BYTE_ORDER)
+    def parse_not_null(cls, stream):
+        length = int.from_bytes(
+            stream.mem_view(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
+            byteorder=PROTOCOL_BYTE_ORDER
+        )
 
         data_type = cls.build_c_type(length)
-        buffer += client.recv(ctypes.sizeof(data_type) - len(buffer))
+        stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+        return data_type
 
-        return data_type, buffer
-
-    @staticmethod
-    def to_python(ctype_object, *args, **kwargs):
-        length = getattr(ctype_object, 'length', None)
-        if length is None:
-            return None
-        elif length > 0:
+    @classmethod
+    def to_python_not_null(cls, ctype_object, *args, **kwargs):
+        if ctype_object.length > 0:
             return ctype_object.data.decode(PROTOCOL_STRING_ENCODING)
-        else:
-            return ''
 
-    @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
+        return ''
 
+    @classmethod
+    def from_python_not_null(cls, stream, value):
         if isinstance(value, str):
             value = value.encode(PROTOCOL_STRING_ENCODING)
         length = len(value)
@@ -135,10 +120,11 @@ class String(IgniteDataType):
         )
         data_object.length = length
         data_object.data = value
-        return bytes(data_object)
+
+        stream.write(data_object)
 
 
-class DecimalObject(IgniteDataType):
+class DecimalObject(IgniteDataType, Nullable):
     _type_name = NAME_DECIMAL
     _type_id = TYPE_DECIMAL
     type_code = TC_DECIMAL
@@ -165,18 +151,10 @@ class DecimalObject(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-        # Decimal or Null
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_c_header()
-        buffer = tc_type + client.recv(
-            ctypes.sizeof(header_class)
-            - len(tc_type)
-        )
-        header = header_class.from_buffer_copy(buffer)
+        header = stream.read_ctype(header_class)
+
         data_type = type(
             cls.__name__,
             (header_class,),
@@ -187,17 +165,12 @@ class DecimalObject(IgniteDataType):
                 ],
             }
         )
-        buffer += client.recv(
-            ctypes.sizeof(data_type)
-            - ctypes.sizeof(header_class)
-        )
-        return data_type, buffer
 
-    @classmethod
-    def to_python(cls, ctype_object, *args, **kwargs):
-        if getattr(ctype_object, 'length', None) is None:
-            return None
+        stream.seek(ctypes.sizeof(data_type), SEEK_CUR)
+        return data_type
 
+    @classmethod
+    def to_python_not_null(cls, ctype_object, *args, **kwargs):
         sign = 1 if ctype_object.data[0] & 0x80 else 0
         data = ctype_object.data[1:]
         data.insert(0, ctype_object.data[0] & 0x7f)
@@ -218,10 +191,7 @@ class DecimalObject(IgniteDataType):
         return result
 
     @classmethod
-    def from_python(cls, value: decimal.Decimal):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value: decimal.Decimal):
         sign, digits, scale = value.normalize().as_tuple()
         integer = int(''.join([str(d) for d in digits]))
         # calculate number of bytes (at least one, and not forget the sign bit)
@@ -257,7 +227,8 @@ class DecimalObject(IgniteDataType):
         data_object.scale = -scale
         for i in range(length):
             data_object.data[i] = data[i]
-        return bytes(data_object)
+
+        stream.write(data_object)
 
 
 class UUIDObject(StandardObject):
@@ -300,10 +271,7 @@ class UUIDObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python(cls, value: uuid.UUID):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value: uuid.UUID):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -312,15 +280,11 @@ class UUIDObject(StandardObject):
         )
         for i, byte in zip(cls.UUID_BYTE_ORDER, bytearray(value.bytes)):
             data_object.value[i] = byte
-        return bytes(data_object)
+
+        stream.write(data_object)
 
     @classmethod
-    def to_python(cls, ctypes_object, *args, **kwargs):
-        if ctypes_object.type_code == int.from_bytes(
-            TC_NULL,
-            byteorder=PROTOCOL_BYTE_ORDER
-        ):
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         uuid_array = bytearray(ctypes_object.value)
         return uuid.UUID(
             bytes=bytes([uuid_array[i] for i in cls.UUID_BYTE_ORDER])
@@ -367,9 +331,7 @@ class TimestampObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python(cls, value: tuple):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value: tuple):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -378,15 +340,11 @@ class TimestampObject(StandardObject):
         )
         data_object.epoch = int(value[0].timestamp() * 1000)
         data_object.fraction = value[1]
-        return bytes(data_object)
+
+        stream.write(data_object)
 
     @classmethod
-    def to_python(cls, ctypes_object, *args, **kwargs):
-        if ctypes_object.type_code == int.from_bytes(
-            TC_NULL,
-            byteorder=PROTOCOL_BYTE_ORDER
-        ):
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         return (
             datetime.fromtimestamp(ctypes_object.epoch/1000),
             ctypes_object.fraction
@@ -428,9 +386,7 @@ class DateObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python(cls, value: [date, datetime]):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value: [date, datetime]):
         if type(value) is date:
             value = datetime.combine(value, time())
         data_type = cls.build_c_type()
@@ -440,15 +396,11 @@ class DateObject(StandardObject):
             byteorder=PROTOCOL_BYTE_ORDER
         )
         data_object.epoch = int(value.timestamp() * 1000)
-        return bytes(data_object)
+
+        stream.write(data_object)
 
     @classmethod
-    def to_python(cls, ctypes_object, *args, **kwargs):
-        if ctypes_object.type_code == int.from_bytes(
-            TC_NULL,
-            byteorder=PROTOCOL_BYTE_ORDER
-        ):
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         return datetime.fromtimestamp(ctypes_object.epoch/1000)
 
 
@@ -486,9 +438,7 @@ class TimeObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python(cls, value: timedelta):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value: timedelta):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -496,15 +446,11 @@ class TimeObject(StandardObject):
             byteorder=PROTOCOL_BYTE_ORDER
         )
         data_object.value = int(value.total_seconds() * 1000)
-        return bytes(data_object)
+
+        stream.write(data_object)
 
     @classmethod
-    def to_python(cls, ctypes_object, *args, **kwargs):
-        if ctypes_object.type_code == int.from_bytes(
-            TC_NULL,
-            byteorder=PROTOCOL_BYTE_ORDER
-        ):
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         return timedelta(milliseconds=ctypes_object.value)
 
 
@@ -539,10 +485,7 @@ class EnumObject(StandardObject):
         return cls._object_c_type
 
     @classmethod
-    def from_python(cls, value: tuple):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value: tuple):
         data_type = cls.build_c_type()
         data_object = data_type()
         data_object.type_code = int.from_bytes(
@@ -550,15 +493,11 @@ class EnumObject(StandardObject):
             byteorder=PROTOCOL_BYTE_ORDER
         )
         data_object.type_id, data_object.ordinal = value
-        return bytes(data_object)
+
+        stream.write(data_object)
 
     @classmethod
-    def to_python(cls, ctypes_object, *args, **kwargs):
-        if ctypes_object.type_code == int.from_bytes(
-            TC_NULL,
-            byteorder=PROTOCOL_BYTE_ORDER
-        ):
-            return None
+    def to_python_not_null(cls, ctypes_object, *args, **kwargs):
         return ctypes_object.type_id, ctypes_object.ordinal
 
 
@@ -571,7 +510,7 @@ class BinaryEnumObject(EnumObject):
     type_code = TC_BINARY_ENUM
 
 
-class StandardArray(IgniteDataType):
+class StandardArray(IgniteDataType, Nullable):
     """
     Base class for array of primitives. Payload-only.
     """
@@ -599,19 +538,14 @@ class StandardArray(IgniteDataType):
         )
 
     @classmethod
-    def parse(cls, client: 'Client'):
-        tc_type = client.recv(ctypes.sizeof(ctypes.c_byte))
-
-        if tc_type == TC_NULL:
-            return Null.build_c_type(), tc_type
-
+    def parse_not_null(cls, stream):
         header_class = cls.build_header_class()
-        buffer = tc_type + client.recv(ctypes.sizeof(header_class) - len(tc_type))
-        header = header_class.from_buffer_copy(buffer)
+        header = stream.read_ctype(header_class)
+        stream.seek(ctypes.sizeof(header_class), SEEK_CUR)
+
         fields = []
         for i in range(header.length):
-            c_type, buffer_fragment = cls.standard_type.parse(client)
-            buffer += buffer_fragment
+            c_type = cls.standard_type.parse(stream)
             fields.append(('element_{}'.format(i), c_type))
 
         final_class = type(
@@ -622,14 +556,15 @@ class StandardArray(IgniteDataType):
                 '_fields_': fields,
             }
         )
-        return final_class, buffer
+        return final_class
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
-        result = []
         length = getattr(ctype_object, "length", None)
         if length is None:
             return None
+
+        result = []
         for i in range(length):
             result.append(
                 cls.standard_type.to_python(
@@ -640,9 +575,7 @@ class StandardArray(IgniteDataType):
         return result
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
+    def from_python_not_null(cls, stream, value):
         header_class = cls.build_header_class()
         header = header_class()
         if hasattr(header, 'type_code'):
@@ -652,11 +585,10 @@ class StandardArray(IgniteDataType):
             )
         length = len(value)
         header.length = length
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += cls.standard_type.from_python(x)
-        return bytes(buffer)
+            cls.standard_type.from_python(stream, x)
 
 
 class StringArray(StandardArray):
@@ -804,10 +736,7 @@ class EnumArrayObject(StandardArrayObject):
         )
 
     @classmethod
-    def from_python(cls, value):
-        if value is None:
-            return Null.from_python()
-
+    def from_python_not_null(cls, stream, value):
         type_id, value = value
         header_class = cls.build_header_class()
         header = header_class()
@@ -819,11 +748,10 @@ class EnumArrayObject(StandardArrayObject):
         length = len(value)
         header.length = length
         header.type_id = type_id
-        buffer = bytearray(header)
 
+        stream.write(header)
         for x in value:
-            buffer += cls.standard_type.from_python(x)
-        return bytes(buffer)
+            cls.standard_type.from_python(stream, x)
 
     @classmethod
     def to_python(cls, ctype_object, *args, **kwargs):
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 69b6fa2..5bd114b 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -21,6 +21,7 @@ from pyignite.api.result import APIResult
 from pyignite.connection import Connection
 from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED
 from pyignite.queries.response import Response, SQLResponse
+from pyignite.stream import BinaryStream, READ_BACKWARD
 
 
 @attr.s
@@ -47,31 +48,28 @@ class Query:
             )
         return cls._query_c_type
 
-    def _build_header(self, buffer: bytearray, values: dict):
+    def _build_header(self, stream, values: dict):
         header_class = self.build_c_type()
+        header_len = ctypes.sizeof(header_class)
+        init_pos = stream.tell()
+        stream.seek(init_pos + header_len)
+
         header = header_class()
         header.op_code = self.op_code
         if self.query_id is None:
             header.query_id = randint(MIN_LONG, MAX_LONG)
 
         for name, c_type in self.following:
-            buffer += c_type.from_python(values[name])
+            c_type.from_python(stream, values[name])
 
-        header.length = (
-                len(buffer)
-                + ctypes.sizeof(header_class)
-                - ctypes.sizeof(ctypes.c_int)
-        )
+        header.length = stream.tell() - init_pos - ctypes.sizeof(ctypes.c_int)
+        stream.seek(init_pos)
 
         return header
 
-    def from_python(self, values: dict = None):
-        if values is None:
-            values = {}
-        buffer = bytearray()
-        header = self._build_header(buffer, values)
-        buffer[:0] = bytes(header)
-        return header.query_id, bytes(buffer)
+    def from_python(self, stream, values: dict = None):
+        header = self._build_header(stream, values if values else {})
+        stream.write(header)
 
     def perform(
         self, conn: Connection, query_params: dict = None,
@@ -89,8 +87,9 @@ class Query:
         :return: instance of :class:`~pyignite.api.result.APIResult` with raw
          value (may undergo further processing in API functions).
         """
-        _, send_buffer = self.from_python(query_params)
-        conn.send(send_buffer)
+        with BinaryStream(conn) as stream:
+            self.from_python(stream, query_params)
+            conn.send(stream.getbuffer())
 
         if sql:
             response_struct = SQLResponse(protocol_version=conn.get_protocol_version(),
@@ -99,8 +98,9 @@ class Query:
             response_struct = Response(protocol_version=conn.get_protocol_version(),
                                        following=response_config)
 
-        response_ctype, recv_buffer = response_struct.parse(conn)
-        response = response_ctype.from_buffer_copy(recv_buffer)
+        with BinaryStream(conn, conn.recv()) as stream:
+            response_ctype = response_struct.parse(stream)
+            response = stream.read_ctype(response_ctype, direction=READ_BACKWARD)
 
         # this test depends on protocol version
         if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED:
@@ -140,7 +140,7 @@ class ConfigQuery(Query):
             )
         return cls._query_c_type
 
-    def _build_header(self, buffer: bytearray, values: dict):
-        header = super()._build_header(buffer, values)
+    def _build_header(self, stream, values: dict):
+        header = super()._build_header(stream, values)
         header.config_length = header.length - ctypes.sizeof(type(header))
         return header
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index 05a519a..016f577 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from io import SEEK_CUR
 
 import attr
 from collections import OrderedDict
@@ -21,6 +22,7 @@ from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR
 from pyignite.connection import Connection
 from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct
 from pyignite.queries.op_codes import OP_SUCCESS
+from pyignite.stream import READ_BACKWARD
 
 
 @attr.s
@@ -55,12 +57,14 @@ class Response:
             )
         return self._response_header
 
-    def parse(self, conn: Connection):
+    def parse(self, stream):
+        init_pos = stream.tell()
         header_class = self.build_header()
-        buffer = bytearray(conn.recv(ctypes.sizeof(header_class)))
-        header = header_class.from_buffer_copy(buffer)
-        fields = []
+        header_len = ctypes.sizeof(header_class)
+        header = stream.read_ctype(header_class)
+        stream.seek(header_len, SEEK_CUR)
 
+        fields = []
         has_error = False
         if self.protocol_version and self.protocol_version >= (1, 4, 0):
             if header.flags & RHF_TOPOLOGY_CHANGED:
@@ -76,20 +80,19 @@ class Response:
             has_error = header.status_code != OP_SUCCESS
 
         if fields:
-            buffer += conn.recv(
-                sum([ctypes.sizeof(c_type) for _, c_type in fields])
-            )
+            stream.seek(sum(ctypes.sizeof(c_type) for _, c_type in fields), SEEK_CUR)
 
         if has_error:
-            msg_type, buffer_fragment = String.parse(conn)
-            buffer += buffer_fragment
+            msg_type = String.parse(stream)
             fields.append(('error_message', msg_type))
         else:
-            self._parse_success(conn, buffer, fields)
+            self._parse_success(stream, fields)
 
-        return self._create_parse_result(conn, header_class, fields, buffer)
+        response_class = self._create_response_class(stream, header_class, fields)
+        stream.seek(init_pos + ctypes.sizeof(response_class))
+        return self._create_response_class(stream, header_class, fields)
 
-    def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray):
+    def _create_response_class(self, stream, header_class, fields: list):
         response_class = type(
             'Response',
             (header_class,),
@@ -98,12 +101,11 @@ class Response:
                 '_fields_': fields,
             }
         )
-        return response_class, bytes(buffer)
+        return response_class
 
-    def _parse_success(self, conn: Connection, buffer: bytearray, fields: list):
+    def _parse_success(self, stream, fields: list):
         for name, ignite_type in self.following:
-            c_type, buffer_fragment = ignite_type.parse(conn)
-            buffer += buffer_fragment
+            c_type = ignite_type.parse(stream)
             fields.append((name, c_type))
 
     def to_python(self, ctype_object, *args, **kwargs):
@@ -134,7 +136,7 @@ class SQLResponse(Response):
             return 'fields', StringArray
         return 'field_count', Int
 
-    def _parse_success(self, conn: Connection, buffer: bytearray, fields: list):
+    def _parse_success(self, stream, fields: list):
         following = [
             self.fields_or_field_count(),
             ('row_count', Int),
@@ -142,9 +144,8 @@ class SQLResponse(Response):
         if self.has_cursor:
             following.insert(0, ('cursor', Long))
         body_struct = Struct(following)
-        body_class, body_buffer = body_struct.parse(conn)
-        body = body_class.from_buffer_copy(body_buffer)
-        buffer += body_buffer
+        body_class = body_struct.parse(stream)
+        body = stream.read_ctype(body_class, direction=READ_BACKWARD)
 
         if self.include_field_names:
             field_count = body.fields.length
@@ -155,9 +156,8 @@ class SQLResponse(Response):
         for i in range(body.row_count):
             row_fields = []
             for j in range(field_count):
-                field_class, field_buffer = AnyDataObject.parse(conn)
+                field_class = AnyDataObject.parse(stream)
                 row_fields.append(('column_{}'.format(j), field_class))
-                buffer += field_buffer
 
             row_class = type(
                 'SQLResponseRow',
@@ -182,7 +182,7 @@ class SQLResponse(Response):
             ('more', ctypes.c_byte),
         ]
 
-    def _create_parse_result(self, conn: Connection, header_class, fields: list, buffer: bytearray):
+    def _create_response_class(self, stream, header_class, fields: list):
         final_class = type(
             'SQLResponse',
             (header_class,),
@@ -191,8 +191,7 @@ class SQLResponse(Response):
                 '_fields_': fields,
             }
         )
-        buffer += conn.recv(ctypes.sizeof(final_class) - len(buffer))
-        return final_class, bytes(buffer)
+        return final_class
 
     def to_python(self, ctype_object, *args, **kwargs):
         if getattr(ctype_object, 'status_code', 0) == 0:
diff --git a/pyignite/datatypes/__init__.py b/pyignite/stream/__init__.py
similarity index 72%
copy from pyignite/datatypes/__init__.py
copy to pyignite/stream/__init__.py
index 5024f79..94153b4 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/stream/__init__.py
@@ -13,15 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-This module contains classes, used internally by `pyignite` for parsing and
-creating binary data.
-"""
-
-from .complex import *
-from .internal import *
-from .null_object import *
-from .primitive import *
-from .primitive_arrays import *
-from .primitive_objects import *
-from .standard import *
+from .binary_stream import BinaryStream, READ_FORWARD, READ_BACKWARD
\ No newline at end of file
diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py
new file mode 100644
index 0000000..1ecdcfb
--- /dev/null
+++ b/pyignite/stream/binary_stream.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import ctypes
+from io import BytesIO
+
+import pyignite.utils as ignite_utils
+
+READ_FORWARD = 0
+READ_BACKWARD = 1
+
+
+class BinaryStream:
+    def __init__(self, conn, buf=None):
+        """
+        Initialize binary stream around buffers.
+
+        :param buf: Buffer, optional parameter. If not passed, creates empty BytesIO.
+        :param conn: Connection instance, required.
+        """
+        from pyignite.connection import Connection
+
+        if not isinstance(conn, Connection):
+            raise TypeError(f"invalid parameter: expected instance of {Connection}")
+
+        if buf and not isinstance(buf, (bytearray, bytes, memoryview)):
+            raise TypeError(f"invalid parameter: expected bytes-like object")
+
+        self.conn = conn
+        self.stream = BytesIO(buf) if buf else BytesIO()
+
+    @property
+    def compact_footer(self) -> bool:
+        return self.conn.client.compact_footer
+
+    @compact_footer.setter
+    def compact_footer(self, value: bool):
+        self.conn.client.compact_footer = value
+
+    def read(self, size):
+        buf = bytearray(size)
+        self.stream.readinto(buf)
+        return buf
+
+    def read_ctype(self, ctype_class, position=None, direction=READ_FORWARD):
+        ctype_len = ctypes.sizeof(ctype_class)
+
+        if position is not None and position >= 0:
+            init_position = position
+        else:
+            init_position = self.tell()
+
+        if direction == READ_FORWARD:
+            start, end = init_position, init_position + ctype_len
+        else:
+            start, end = init_position - ctype_len, init_position
+
+        buf = self.stream.getbuffer()[start:end]
+        return ctype_class.from_buffer_copy(buf)
+
+    def write(self, buf):
+        return self.stream.write(buf)
+
+    def tell(self):
+        return self.stream.tell()
+
+    def seek(self, *args, **kwargs):
+        return self.stream.seek(*args, **kwargs)
+
+    def getvalue(self):
+        return self.stream.getvalue()
+
+    def getbuffer(self):
+        return self.stream.getbuffer()
+
+    def mem_view(self, start=-1, offset=0):
+        start = start if start >= 0 else self.tell()
+        return self.stream.getbuffer()[start:start+offset]
+
+    def hashcode(self, start, bytes_len):
+        return ignite_utils.hashcode(self.stream.getbuffer()[start:start+bytes_len])
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.stream.close()
+
+    def get_dataclass(self, header):
+        # get field names from outer space
+        result = self.conn.client.query_binary_type(
+            header.type_id,
+            header.schema_id
+        )
+        if not result:
+            raise RuntimeError('Binary type is not registered')
+        return result
+
+    def register_binary_type(self, *args, **kwargs):
+        return self.conn.client.register_binary_type(*args, **kwargs)
diff --git a/pyignite/utils.py b/pyignite/utils.py
index ef7b6f6..3d0378f 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -19,7 +19,7 @@ import warnings
 
 from functools import wraps
 from threading import Event, Thread
-from typing import Any, Callable, Optional, Type, Tuple, Union
+from typing import Any, Optional, Type, Tuple, Union
 
 from pyignite.datatypes.base import IgniteDataType
 from .constants import *
@@ -85,29 +85,7 @@ def int_overflow(value: int) -> int:
     return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000
 
 
-def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
-    """
-    Unwrap wrapped BinaryObject and convert it to Python data.
-
-    :param client: connection to Ignite cluster,
-    :param wrapped: `WrappedDataObject` value,
-    :return: dict representing wrapped BinaryObject.
-    """
-    from pyignite.datatypes.complex import BinaryObject
-
-    blob, offset = wrapped
-    conn_clone = client.random_node.clone(prefetch=blob)
-    conn_clone.pos = offset
-    data_class, data_bytes = BinaryObject.parse(conn_clone)
-    result = BinaryObject.to_python(
-        data_class.from_buffer_copy(data_bytes),
-        client,
-    )
-    conn_clone.close()
-    return result
-
-
-def hashcode(data: Union[str, bytes]) -> int:
+def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:
     """
     Calculate hash code used for identifying objects in Ignite binary API.
 
diff --git a/requirements/tests.txt b/requirements/tests.txt
index 327f501..893928e 100644
--- a/requirements/tests.txt
+++ b/requirements/tests.txt
@@ -4,3 +4,4 @@ pytest==3.6.1
 pytest-cov==2.5.1
 teamcity-messages==1.21
 psutil==5.6.5
+jinja2==2.11.3
diff --git a/tests/config/ignite-config-ssl.xml b/tests/config/ignite-config-ssl.xml
deleted file mode 100644
index 827405c..0000000
--- a/tests/config/ignite-config-ssl.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<beans xmlns="http://www.springframework.org/schema/beans"
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xsi:schemaLocation="
-        http://www.springframework.org/schema/beans
-        http://www.springframework.org/schema/beans/spring-beans.xsd">
-    <import resource="ignite-config-base.xml"/>
-
-    <bean parent="grid.cfg">
-        <property name="connectorConfiguration"><null/></property>
-
-        <property name="clientConnectorConfiguration">
-            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
-                <property name="host" value="127.0.0.1"/>
-                <property name="port" value="${IGNITE_CLIENT_PORT}"/>
-                <property name="portRange" value="0"/>
-                <property name="sslEnabled" value="true"/>
-                <property name="useIgniteSslContextFactory" value="false"/>
-                <property name="sslClientAuth" value="true"/>
-
-                <!-- Provide Ssl context. -->
-                <property name="sslContextFactory">
-                    <bean class="org.apache.ignite.ssl.SslContextFactory">
-                        <property name="keyStoreFilePath" value="config/ssl/server.jks"/>
-                        <property name="keyStorePassword" value="123456"/>
-                        <property name="trustStoreFilePath" value="config/ssl/trust.jks"/>
-                        <property name="trustStorePassword" value="123456"/>
-                    </bean>
-                </property>
-            </bean>
-        </property>
-    </bean>
-</beans>
\ No newline at end of file
diff --git a/tests/config/ignite-config.xml b/tests/config/ignite-config.xml
deleted file mode 100644
index 09fba2c..0000000
--- a/tests/config/ignite-config.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<beans xmlns="http://www.springframework.org/schema/beans"
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xmlns:util="http://www.springframework.org/schema/util"
-       xsi:schemaLocation="
-        http://www.springframework.org/schema/beans
-        http://www.springframework.org/schema/beans/spring-beans.xsd
-        http://www.springframework.org/schema/util
-        http://www.springframework.org/schema/util/spring-util.xsd">
-    <import resource="ignite-config-base.xml"/>
-
-    <bean parent="grid.cfg">
-        <property name="clientConnectorConfiguration">
-            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
-                <property name="host" value="127.0.0.1"/>
-                <property name="port" value="${IGNITE_CLIENT_PORT}"/>
-                <property name="portRange" value="0"/>
-            </bean>
-        </property>
-    </bean>
-</beans>
diff --git a/tests/config/ignite-config-base.xml b/tests/config/ignite-config.xml.jinja2
similarity index 65%
rename from tests/config/ignite-config-base.xml
rename to tests/config/ignite-config.xml.jinja2
index 7487618..322a958 100644
--- a/tests/config/ignite-config-base.xml
+++ b/tests/config/ignite-config.xml.jinja2
@@ -26,12 +26,35 @@
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd">
 
-    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
-        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK"/>
-        <property name="searchSystemEnvironment" value="true"/>
-    </bean>
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        {% if use_ssl %}
+            <property name="connectorConfiguration"><null/></property>
+        {% endif %}
+
+        <property name="clientConnectorConfiguration">
+            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+                <property name="host" value="127.0.0.1"/>
+                <property name="port" value="{{ ignite_client_port }}"/>
+                <property name="portRange" value="0"/>
+                {% if use_ssl %}
+                    <property name="sslEnabled" value="true"/>
+                    <property name="useIgniteSslContextFactory" value="false"/>
+                    <property name="sslClientAuth" value="true"/>
+
+                    <property name="sslContextFactory">
+                        <bean class="org.apache.ignite.ssl.SslContextFactory">
+                            <property name="keyStoreFilePath" value="config/ssl/server.jks"/>
+                            <property name="keyStorePassword" value="123456"/>
+                            <property name="trustStoreFilePath" value="config/ssl/trust.jks"/>
+                            <property name="trustStorePassword" value="123456"/>
+                        </bean>
+                    </property>
+                {% endif %}
+            </bean>
+        </property>
+
+        <property name="consistentId" value="srv_{{  ignite_instance_idx  }}"/>
 
-    <bean id="grid.cfg" abstract="true" class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="localHost" value="127.0.0.1"/>
 
         <property name="discoverySpi">
@@ -42,7 +65,7 @@
                     <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                         <property name="addresses">
                             <list>
-                                <value>127.0.0.1:48500..48503</value>
+                                <value>127.0.0.1:48500..48510</value>
                             </list>
                         </property>
                     </bean>
@@ -69,9 +92,9 @@
             </list>
         </property>
 
-         <property name="gridLogger">
+        <property name="gridLogger">
             <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
-              <constructor-arg type="java.lang.String" value="config/log4j.xml"/>
+              <constructor-arg type="java.lang.String" value="config/log4j-{{ ignite_instance_idx }}.xml"/>
             </bean>
         </property>
     </bean>
diff --git a/tests/config/log4j.xml b/tests/config/log4j.xml.jinja2
similarity index 90%
rename from tests/config/log4j.xml
rename to tests/config/log4j.xml.jinja2
index f5562d0..628f66c 100644
--- a/tests/config/log4j.xml
+++ b/tests/config/log4j.xml.jinja2
@@ -23,8 +23,8 @@
             <PatternLayout pattern="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
         </Console>
         <RollingFile name="FILE" append="true"
-                     filePattern="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}-%i.log.gz"
-                     fileName="logs/ignite-log-${env:IGNITE_INSTANCE_INDEX}.txt">
+                     filePattern="logs/ignite-log-{{ ignite_instance_idx }}-%i.txt"
+                     fileName="logs/ignite-log-{{ ignite_instance_idx }}.txt">
             <PatternLayout pattern="%m%n"/>
             <Policies>
                 <SizeBasedTriggeringPolicy size="10MB" />
diff --git a/tests/conftest.py b/tests/conftest.py
index 9974b16..54a7fda 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -22,7 +22,7 @@ import pytest
 from pyignite import Client
 from pyignite.constants import *
 from pyignite.api import cache_create, cache_destroy
-from tests.util import _start_ignite, start_ignite_gen, get_request_grid_idx
+from tests.util import _start_ignite, start_ignite_gen
 
 
 class BoolParser(argparse.Action):
@@ -134,12 +134,6 @@ def cache(client):
     cache_destroy(conn, cache_name)
 
 
-@pytest.fixture(autouse=True)
-def log_init():
-    # Init log call timestamp
-    get_request_grid_idx()
-
-
 @pytest.fixture(scope='module')
 def start_client(use_ssl, ssl_keyfile, ssl_keyfile_password, ssl_certfile, ssl_ca_certfile, ssl_cert_reqs, ssl_ciphers,
                  ssl_version,username, password):
diff --git a/tests/test_affinity_request_routing.py b/tests/test_affinity_request_routing.py
index eb46ab6..cd0c015 100644
--- a/tests/test_affinity_request_routing.py
+++ b/tests/test_affinity_request_routing.py
@@ -13,18 +13,56 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import OrderedDict
-
+from collections import OrderedDict, deque
 import pytest
 
 from pyignite import *
+from pyignite.connection import Connection
 from pyignite.datatypes import *
 from pyignite.datatypes.cache_config import CacheMode
 from pyignite.datatypes.prop_codes import *
 from tests.util import *
 
 
-@pytest.mark.parametrize("key,grid_idx", [(1, 3), (2, 1), (3, 1), (4, 3), (5, 1), (6, 3), (11, 2), (13, 2), (19, 2)])
+requests = deque()
+old_send = Connection.send
+
+
+def patched_send(self, *args, **kwargs):
+    """Patched send function that push to queue idx of server to which request is routed."""
+    requests.append(self.port % 100)
+    return old_send(self, *args, **kwargs)
+
+
+def setup_function():
+    requests.clear()
+    Connection.send = patched_send
+
+
+def teardown_function():
+    Connection.send = old_send
+
+
+def wait_for_affinity_distribution(cache, key, node_idx, timeout=30):
+    real_node_idx = 0
+
+    def check_grid_idx():
+        nonlocal real_node_idx
+        try:
+            cache.get(key)
+            real_node_idx = requests.pop()
+        except (OSError, IOError):
+            return False
+        return real_node_idx == node_idx
+
+    res = wait_for_condition(check_grid_idx, timeout=timeout)
+
+    if not res:
+        raise TimeoutError(f"failed to wait for affinity distribution, expected node_idx {node_idx},"
+                           f"got {real_node_idx} instead")
+
+
+@pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)])
 @pytest.mark.parametrize("backups", [0, 1, 2, 3])
 def test_cache_operation_on_primitive_key_routes_request_to_primary_node(
         request, key, grid_idx, backups, client_partition_aware):
@@ -34,52 +72,51 @@ def test_cache_operation_on_primitive_key_routes_request_to_primary_node(
         PROP_BACKUPS_NUMBER: backups,
     })
 
-    # Warm up affinity map
     cache.put(key, key)
-    get_request_grid_idx()
+    wait_for_affinity_distribution(cache, key, grid_idx)
 
     # Test
     cache.get(key)
-    assert get_request_grid_idx() == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.put(key, key)
-    assert get_request_grid_idx("Put") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.replace(key, key + 1)
-    assert get_request_grid_idx("Replace") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.clear_key(key)
-    assert get_request_grid_idx("ClearKey") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.contains_key(key)
-    assert get_request_grid_idx("ContainsKey") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.get_and_put(key, 3)
-    assert get_request_grid_idx("GetAndPut") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.get_and_put_if_absent(key, 4)
-    assert get_request_grid_idx("GetAndPutIfAbsent") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.put_if_absent(key, 5)
-    assert get_request_grid_idx("PutIfAbsent") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.get_and_remove(key)
-    assert get_request_grid_idx("GetAndRemove") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.get_and_replace(key, 6)
-    assert get_request_grid_idx("GetAndReplace") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.remove_key(key)
-    assert get_request_grid_idx("RemoveKey") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.remove_if_equals(key, -1)
-    assert get_request_grid_idx("RemoveIfEquals") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.replace(key, -1)
-    assert get_request_grid_idx("Replace") == grid_idx
+    assert requests.pop() == grid_idx
 
     cache.replace_if_equals(key, 10, -10)
-    assert get_request_grid_idx("ReplaceIfEquals") == grid_idx
+    assert requests.pop() == grid_idx
 
 
 @pytest.mark.skip(reason="Custom key objects are not supported yet")
@@ -121,31 +158,28 @@ def test_cache_operation_on_custom_affinity_key_routes_request_to_primary_node(
     cache.put(key_obj, 1)
     cache.put(key_obj, 2)
 
-    assert get_request_grid_idx("Put") == grid_idx
+    assert requests.pop() == grid_idx
 
 
-@pytest.mark.skip("https://issues.apache.org/jira/browse/IGNITE-13967")
 def test_cache_operation_routed_to_new_cluster_node(request, start_ignite_server, start_client):
     client = start_client(partition_aware=True)
     client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802), ("127.0.0.1", 10803), ("127.0.0.1", 10804)])
     cache = client.get_or_create_cache(request.node.name)
     key = 12
+    wait_for_affinity_distribution(cache, key, 3)
     cache.put(key, key)
     cache.put(key, key)
-    assert get_request_grid_idx("Put") == 3
+    assert requests.pop() == 3
 
     srv = start_ignite_server(4)
     try:
         # Wait for rebalance and partition map exchange
-        def check_grid_idx():
-            cache.get(key)
-            return get_request_grid_idx() == 4
-        wait_for_condition(check_grid_idx)
+        wait_for_affinity_distribution(cache, key, 4)
 
         # Response is correct and comes from the new node
         res = cache.get_and_remove(key)
         assert res == key
-        assert get_request_grid_idx("GetAndRemove") == 4
+        assert requests.pop() == 4
     finally:
         kill_process_tree(srv.pid)
 
@@ -167,13 +201,13 @@ def verify_random_node(cache):
     key = 1
     cache.put(key, key)
 
-    idx1 = get_request_grid_idx("Put")
+    idx1 = requests.pop()
     idx2 = idx1
 
     # Try 10 times - random node may end up being the same
     for _ in range(1, 10):
         cache.put(key, key)
-        idx2 = get_request_grid_idx("Put")
+        idx2 = requests.pop()
         if idx2 != idx1:
             break
     assert idx1 != idx2
diff --git a/tests/test_affinity_single_connection.py b/tests/test_affinity_single_connection.py
index c40393c..1943384 100644
--- a/tests/test_affinity_single_connection.py
+++ b/tests/test_affinity_single_connection.py
@@ -13,10 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pytest
-
-from tests.util import get_request_grid_idx
-
 
 def test_all_cache_operations_with_partition_aware_client_on_single_server(request, client_partition_aware_single_server):
     cache = client_partition_aware_single_server.get_or_create_cache(request.node.name)
diff --git a/tests/test_cache_composite_key_class_sql.py b/tests/test_cache_composite_key_class_sql.py
index 2f1705f..989a229 100644
--- a/tests/test_cache_composite_key_class_sql.py
+++ b/tests/test_cache_composite_key_class_sql.py
@@ -111,13 +111,12 @@ def test_python_sql_finds_inserted_value_with_composite_key(client):
 
 
 def validate_query_result(student_key, student_val, query_result):
-    '''
+    """
     Compare query result with expected key and value.
-    '''
+    """
     assert len(query_result) == 2
     sql_row = dict(zip(query_result[0], query_result[1]))
 
-    assert sql_row["_KEY"][0] == student_key._buffer
     assert sql_row['ID'] == student_key.ID
     assert sql_row['DEPT'] == student_key.DEPT
     assert sql_row['NAME'] == student_val.NAME
diff --git a/tests/test_sql.py b/tests/test_sql.py
index 15f84ee..c896afb 100644
--- a/tests/test_sql.py
+++ b/tests/test_sql.py
@@ -22,7 +22,9 @@ from pyignite.api import (
 )
 from pyignite.datatypes.prop_codes import *
 from pyignite.exceptions import SQLError
-from pyignite.utils import entity_id, unwrap_binary
+from pyignite.utils import entity_id
+from pyignite.binary import unwrap_binary
+
 
 initial_data = [
         ('John', 'Doe', 5),
diff --git a/tests/util.py b/tests/util.py
index 1d6acd6..90f0146 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -15,6 +15,8 @@
 
 import glob
 import os
+
+import jinja2 as jinja2
 import psutil
 import re
 import signal
@@ -72,22 +74,19 @@ def get_ignite_config_path(use_ssl=False):
     if use_ssl:
         file_name = "ignite-config-ssl.xml"
     else:
-        file_name = "ignite-config.xml"
+        file_name = "ignite-config.xml.jinja2"
 
     return os.path.join(get_test_dir(), "config", file_name)
 
 
 def check_server_started(idx=1):
-    log_file = os.path.join(get_test_dir(), "logs", f"ignite-log-{idx}.txt")
-    if not os.path.exists(log_file):
-        return False
-
     pattern = re.compile('^Topology snapshot.*')
 
-    with open(log_file) as f:
-        for line in f.readlines():
-            if pattern.match(line):
-                return True
+    for log_file in get_log_files(idx):
+        with open(log_file) as f:
+            for line in f.readlines():
+                if pattern.match(line):
+                    return True
 
     return False
 
@@ -102,20 +101,33 @@ def kill_process_tree(pid):
         os.kill(pid, signal.SIGKILL)
 
 
+templateLoader = jinja2.FileSystemLoader(searchpath=os.path.join(get_test_dir(), "config"))
+templateEnv = jinja2.Environment(loader=templateLoader)
+
+
+def create_config_file(tpl_name, file_name, **kwargs):
+    template = templateEnv.get_template(tpl_name)
+    with open(os.path.join(get_test_dir(), "config", file_name), mode='w') as f:
+        f.write(template.render(**kwargs))
+
+
 def _start_ignite(idx=1, debug=False, use_ssl=False):
     clear_logs(idx)
 
     runner = get_ignite_runner()
 
     env = os.environ.copy()
-    env['IGNITE_INSTANCE_INDEX'] = str(idx)
-    env['IGNITE_CLIENT_PORT'] = str(10800 + idx)
 
     if debug:
         env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
                           "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
 
-    ignite_cmd = [runner, get_ignite_config_path(use_ssl)]
+    params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl}
+
+    create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params)
+    create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params)
+
+    ignite_cmd = [runner, os.path.join(get_test_dir(), "config", f'ignite-config-{idx}.xml')]
     print("Starting Ignite server node:", ignite_cmd)
 
     srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
@@ -142,38 +154,3 @@ def get_log_files(idx=1):
 def clear_logs(idx=1):
     for f in get_log_files(idx):
         os.remove(f)
-
-
-def read_log_file(file, idx):
-    i = -1
-    with open(file) as f:
-        lines = f.readlines()
-        for line in lines:
-            i += 1
-
-            if i < read_log_file.last_line[idx]:
-                continue
-
-            if i > read_log_file.last_line[idx]:
-                read_log_file.last_line[idx] = i
-
-            # Example: Client request received [reqId=1, addr=/127.0.0.1:51694,
-            # req=org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest@1f33101e]
-            res = re.match("Client request received .*?req=org.apache.ignite.internal.processors."
-                           "platform.client.cache.ClientCache([a-zA-Z]+)Request@", line)
-
-            if res is not None:
-                yield res.group(1)
-
-
-def get_request_grid_idx(message="Get"):
-    res = -1
-    for i in range(1, 5):
-        for log_file in get_log_files(i):
-            for log in read_log_file(log_file, i):
-                if log == message:
-                    res = i  # Do not exit early to advance all log positions
-    return res
-
-
-read_log_file.last_line = [0, 0, 0, 0, 0]
\ No newline at end of file
diff --git a/tox.ini b/tox.ini
index 69db226..4361413 100644
--- a/tox.ini
+++ b/tox.ini
@@ -34,6 +34,10 @@ usedevelop = True
 commands =
     pytest {env:PYTESTARGS:} {posargs}
 
+[jenkins]
+setenv:
+    PYTESTARGS = --junitxml=junit-{envname}.xml
+
 [no-ssl]
 setenv:
     PYTEST_ADDOPTS = --examples
@@ -54,3 +58,18 @@ setenv: {[ssl]setenv}
 
 [testenv:py{36,37,38}-ssl-password]
 setenv: {[ssl-password]setenv}
+
+[testenv:py{36,37,38}-jenkins-no-ssl]
+setenv:
+    {[no-ssl]setenv}
+    {[jenkins]setenv}
+
+[testenv:py{36,37,38}-jenkins-ssl]
+setenv:
+    {[ssl]setenv}
+    {[jenkins]setenv}
+
+[testenv:py{36,37,38}-jenkins-ssl-password]
+setenv:
+    {[ssl-password]setenv}
+    {[jenkins]setenv}