You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/04/05 11:16:09 UTC

[ignite-python-thin-client] branch master updated: IGNITE-14472 Multiple performance improvements

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new e48f4be  IGNITE-14472 Multiple performance improvements
e48f4be is described below

commit e48f4bea7f91325ad1e07056c9d236008b91ee7e
Author: Ivan Dashchinskiy <iv...@gmail.com>
AuthorDate: Mon Apr 5 14:14:49 2021 +0300

    IGNITE-14472 Multiple performance improvements
    
    This closes #28
---
 pyignite/binary.py                    |  2 +-
 pyignite/connection/aio_connection.py | 42 +++++++++++++++++------
 pyignite/connection/connection.py     | 59 +++++++++++++++++++-------------
 pyignite/datatypes/internal.py        | 64 +++++++++++++++++------------------
 pyignite/datatypes/null_object.py     |  2 +-
 pyignite/datatypes/standard.py        |  2 +-
 pyignite/queries/query.py             |  4 +--
 pyignite/queries/response.py          | 48 +++++++++++++-------------
 pyignite/stream/binary_stream.py      | 51 +++++++++++++++++-----------
 9 files changed, 159 insertions(+), 115 deletions(-)

diff --git a/pyignite/binary.py b/pyignite/binary.py
index 4e34267..5a5f895 100644
--- a/pyignite/binary.py
+++ b/pyignite/binary.py
@@ -201,7 +201,7 @@ class GenericObjectMeta(GenericObjectPropsMeta):
             stream.write(schema)
 
             if save_to_buf:
-                obj._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
+                obj._buffer = stream.slice(initial_pos, stream.tell() - initial_pos)
             obj._hashcode = header.hash_code
 
         def _setattr(self, attr_name: str, attr_value: Any):
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index ce32592..020f8d4 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -158,7 +158,7 @@ class AioConnection(BaseConnection):
 
         with AioBinaryStream(self.client) as stream:
             await hs_request.from_python_async(stream)
-            await self._send(stream.getbuffer(), reconnect=False)
+            await self._send(stream.getvalue(), reconnect=False)
 
         with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
             hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
@@ -185,7 +185,7 @@ class AioConnection(BaseConnection):
         except connection_errors:
             pass
 
-    async def request(self, data: Union[bytes, bytearray, memoryview]) -> bytearray:
+    async def request(self, data: Union[bytes, bytearray]) -> bytearray:
         """
         Perform request.
 
@@ -195,7 +195,7 @@ class AioConnection(BaseConnection):
             await self._send(data)
             return await self._recv()
 
-    async def _send(self, data: Union[bytes, bytearray, memoryview], reconnect=True):
+    async def _send(self, data: Union[bytes, bytearray], reconnect=True):
         if self.closed:
             raise SocketError('Attempt to use closed connection.')
 
@@ -212,21 +212,43 @@ class AioConnection(BaseConnection):
         if self.closed:
             raise SocketError('Attempt to use closed connection.')
 
-        with BytesIO() as stream:
+        data = bytearray(1024)
+        buffer = memoryview(data)
+        bytes_total_received, bytes_to_receive = 0, 0
+        while True:
             try:
-                buf = await self._reader.readexactly(4)
-                response_len = int.from_bytes(buf, PROTOCOL_BYTE_ORDER)
+                chunk = await self._reader.read(len(buffer))
+                bytes_received = len(chunk)
+                if bytes_received == 0:
+                    raise SocketError('Connection broken.')
 
-                stream.write(buf)
-
-                stream.write(await self._reader.readexactly(response_len))
+                buffer[0:bytes_received] = chunk
+                bytes_total_received += bytes_received
             except connection_errors:
                 self.failed = True
                 if reconnect:
                     await self._reconnect()
                 raise
 
-            return bytearray(stream.getbuffer())
+            if bytes_total_received < 4:
+                continue
+            elif bytes_to_receive == 0:
+                response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
+                bytes_to_receive = response_len
+
+                if response_len + 4 > len(data):
+                    buffer.release()
+                    data.extend(bytearray(response_len + 4 - len(data)))
+                    buffer = memoryview(data)[bytes_total_received:]
+                    continue
+
+            if bytes_total_received >= bytes_to_receive:
+                buffer.release()
+                break
+
+            buffer = buffer[bytes_received:]
+
+        return data
 
     async def close(self):
         async with self._mux:
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index 7d5778c..e8437dc 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -212,7 +212,7 @@ class Connection(BaseConnection):
 
         with BinaryStream(self.client) as stream:
             hs_request.from_python(stream)
-            self.send(stream.getbuffer(), reconnect=False)
+            self.send(stream.getvalue(), reconnect=False)
 
         with BinaryStream(self.client, self.recv(reconnect=False)) as stream:
             hs_response = HandshakeResponse.parse(stream, self.protocol_context)
@@ -235,7 +235,7 @@ class Connection(BaseConnection):
         except connection_errors:
             pass
 
-    def request(self, data: Union[bytes, bytearray, memoryview], flags=None) -> bytearray:
+    def request(self, data: Union[bytes, bytearray], flags=None) -> bytearray:
         """
         Perform request.
 
@@ -245,7 +245,7 @@ class Connection(BaseConnection):
         self.send(data, flags=flags)
         return self.recv()
 
-    def send(self, data: Union[bytes, bytearray, memoryview], flags=None, reconnect=True):
+    def send(self, data: Union[bytes, bytearray], flags=None, reconnect=True):
         """
         Send data down the socket.
 
@@ -275,22 +275,6 @@ class Connection(BaseConnection):
         :param flags: (optional) OS-specific flags.
         :param reconnect: (optional) reconnect on failure, default True.
         """
-        def _recv(buffer, num_bytes):
-            bytes_to_receive = num_bytes
-            while bytes_to_receive > 0:
-                try:
-                    bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs)
-                    if bytes_rcvd == 0:
-                        raise SocketError('Connection broken.')
-                except connection_errors:
-                    self.failed = True
-                    if reconnect:
-                        self.reconnect()
-                    raise
-
-                buffer = buffer[bytes_rcvd:]
-                bytes_to_receive -= bytes_rcvd
-
         if self.closed:
             raise SocketError('Attempt to use closed connection.')
 
@@ -298,12 +282,39 @@ class Connection(BaseConnection):
         if flags is not None:
             kwargs['flags'] = flags
 
-        data = bytearray(4)
-        _recv(memoryview(data), 4)
-        response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
+        data = bytearray(1024)
+        buffer = memoryview(data)
+        bytes_total_received, bytes_to_receive = 0, 0
+        while True:
+            try:
+                bytes_received = self._socket.recv_into(buffer, len(buffer), **kwargs)
+                if bytes_received == 0:
+                    raise SocketError('Connection broken.')
+                bytes_total_received += bytes_received
+            except connection_errors:
+                self.failed = True
+                if reconnect:
+                    self.reconnect()
+                raise
+
+            if bytes_total_received < 4:
+                continue
+            elif bytes_to_receive == 0:
+                response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
+                bytes_to_receive = response_len
+
+                if response_len + 4 > len(data):
+                    buffer.release()
+                    data.extend(bytearray(response_len + 4 - len(data)))
+                    buffer = memoryview(data)[bytes_total_received:]
+                    continue
+
+            if bytes_total_received >= bytes_to_receive:
+                buffer.release()
+                break
+
+            buffer = buffer[bytes_received:]
 
-        data.extend(bytearray(response_len))
-        _recv(memoryview(data)[4:], response_len)
         return data
 
     def close(self):
diff --git a/pyignite/datatypes/internal.py b/pyignite/datatypes/internal.py
index 0de50e2..55ed844 100644
--- a/pyignite/datatypes/internal.py
+++ b/pyignite/datatypes/internal.py
@@ -36,7 +36,10 @@ __all__ = [
 from ..stream import READ_BACKWARD
 
 
-def tc_map(key: bytes, _memo_map: dict = {}):
+_tc_map = {}
+
+
+def tc_map(key: bytes):
     """
     Returns a default parser/generator class for the given type code.
 
@@ -49,7 +52,8 @@ def tc_map(key: bytes, _memo_map: dict = {}):
      of the “type code-type class” mapping,
     :return: parser/generator class for the type code.
     """
-    if not _memo_map:
+    global _tc_map
+    if not _tc_map:
         from pyignite.datatypes import (
             Null, ByteObject, ShortObject, IntObject, LongObject, FloatObject,
             DoubleObject, CharObject, BoolObject, UUIDObject, DateObject,
@@ -64,7 +68,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
             MapObject, BinaryObject, WrappedDataObject,
         )
 
-        _memo_map = {
+        _tc_map = {
             TC_NULL: Null,
 
             TC_BYTE: ByteObject,
@@ -110,7 +114,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
             TC_COMPLEX_OBJECT: BinaryObject,
             TC_ARRAY_WRAPPED_OBJECTS: WrappedDataObject,
         }
-    return _memo_map[key]
+    return _tc_map[key]
 
 
 class Conditional:
@@ -183,7 +187,7 @@ class StructArray:
     def __parse_length(self, stream):
         counter_type_len = ctypes.sizeof(self.counter_type)
         length = int.from_bytes(
-            stream.mem_view(offset=counter_type_len),
+            stream.slice(offset=counter_type_len),
             byteorder=PROTOCOL_BYTE_ORDER
         )
         stream.seek(counter_type_len, SEEK_CUR)
@@ -348,6 +352,9 @@ class AnyDataObject:
     """
     _python_map = None
     _python_array_map = None
+    _map_obj_type = None
+    _collection_obj_type = None
+    _binary_obj_type = None
 
     @staticmethod
     def get_subtype(iterable, allow_none=False):
@@ -391,7 +398,7 @@ class AnyDataObject:
 
     @classmethod
     def __data_class_parse(cls, stream):
-        type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte)))
+        type_code = stream.slice(offset=ctypes.sizeof(ctypes.c_byte))
         try:
             return tc_map(type_code)
         except KeyError:
@@ -416,15 +423,17 @@ class AnyDataObject:
         return tc_map(type_code)
 
     @classmethod
-    def _init_python_map(cls):
+    def _init_python_mapping(cls):
         """
         Optimizes Python types→Ignite types map creation for speed.
 
         Local imports seem inevitable here.
         """
         from pyignite.datatypes import (
-            LongObject, DoubleObject, String, BoolObject, Null, UUIDObject,
-            DateObject, TimeObject, DecimalObject, ByteArrayObject,
+            LongObject, DoubleObject, String, BoolObject, Null, UUIDObject, DateObject, TimeObject,
+            DecimalObject, ByteArrayObject, LongArrayObject, DoubleArrayObject, StringArrayObject,
+            BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject, DecimalArrayObject,
+            MapObject, CollectionObject, BinaryObject
         )
 
         cls._python_map = {
@@ -442,17 +451,6 @@ class AnyDataObject:
             decimal.Decimal: DecimalObject,
         }
 
-    @classmethod
-    def _init_python_array_map(cls):
-        """
-        Optimizes  Python types→Ignite array types map creation for speed.
-        """
-        from pyignite.datatypes import (
-            LongArrayObject, DoubleArrayObject, StringArrayObject,
-            BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject,
-            DecimalArrayObject,
-        )
-
         cls._python_array_map = {
             int: LongArrayObject,
             float: DoubleArrayObject,
@@ -466,18 +464,20 @@ class AnyDataObject:
             decimal.Decimal: DecimalArrayObject,
         }
 
+        cls._map_obj_type = MapObject
+        cls._collection_obj_type = CollectionObject
+        cls._binary_obj_type = BinaryObject
+
     @classmethod
     def map_python_type(cls, value):
-        from pyignite.datatypes import (
-            MapObject, CollectionObject, BinaryObject,
-        )
-
-        if cls._python_map is None:
-            cls._init_python_map()
-        if cls._python_array_map is None:
-            cls._init_python_array_map()
+        if cls._python_map is None or cls._python_array_map is None:
+            cls._init_python_mapping()
 
         value_type = type(value)
+
+        if value_type in cls._python_map:
+            return cls._python_map[value_type]
+
         if is_iterable(value) and value_type not in (str, bytearray, bytes):
             value_subtype = cls.get_subtype(value)
             if value_subtype in cls._python_array_map:
@@ -490,7 +490,7 @@ class AnyDataObject:
                 isinstance(value[0], int),
                 isinstance(value[1], dict),
             ]):
-                return MapObject
+                return cls._map_obj_type
 
             if all([
                 value_subtype is None,
@@ -498,7 +498,7 @@ class AnyDataObject:
                 isinstance(value[0], int),
                 is_iterable(value[1]),
             ]):
-                return CollectionObject
+                return cls._collection_obj_type
 
             # no default for ObjectArrayObject, sorry
 
@@ -507,10 +507,8 @@ class AnyDataObject:
             )
 
         if is_binary(value):
-            return BinaryObject
+            return cls._binary_obj_type
 
-        if value_type in cls._python_map:
-            return cls._python_map[value_type]
         raise TypeError(
             'Type `{}` is invalid.'.format(value_type)
         )
diff --git a/pyignite/datatypes/null_object.py b/pyignite/datatypes/null_object.py
index f16034f..8ac47b2 100644
--- a/pyignite/datatypes/null_object.py
+++ b/pyignite/datatypes/null_object.py
@@ -140,7 +140,7 @@ class Nullable(IgniteDataType):
     def __check_null_input(cls, stream):
         type_len = ctypes.sizeof(ctypes.c_byte)
 
-        if stream.mem_view(offset=type_len) == TC_NULL:
+        if stream.slice(offset=type_len) == TC_NULL:
             stream.seek(type_len, SEEK_CUR)
             return True, Null.build_c_type()
 
diff --git a/pyignite/datatypes/standard.py b/pyignite/datatypes/standard.py
index 2b61235..4ca6795 100644
--- a/pyignite/datatypes/standard.py
+++ b/pyignite/datatypes/standard.py
@@ -91,7 +91,7 @@ class String(Nullable):
     @classmethod
     def parse_not_null(cls, stream):
         length = int.from_bytes(
-            stream.mem_view(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
+            stream.slice(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
             byteorder=PROTOCOL_BYTE_ORDER
         )
 
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index d9e6aaf..8dac64f 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -122,7 +122,7 @@ class Query:
         """
         with BinaryStream(conn.client) as stream:
             self.from_python(stream, query_params)
-            response_data = conn.request(stream.getbuffer())
+            response_data = conn.request(stream.getvalue())
 
         response_struct = self.response_type(protocol_context=conn.protocol_context,
                                              following=response_config, **kwargs)
@@ -154,7 +154,7 @@ class Query:
         """
         with AioBinaryStream(conn.client) as stream:
             await self.from_python_async(stream, query_params)
-            data = await conn.request(stream.getbuffer())
+            data = await conn.request(stream.getvalue())
 
         response_struct = self.response_type(protocol_context=conn.protocol_context,
                                              following=response_config, **kwargs)
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index 6495802..f0338e1 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -27,42 +27,42 @@ from pyignite.queries.op_codes import OP_SUCCESS
 from pyignite.stream import READ_BACKWARD
 
 
+class StatusFlagResponseHeader(ctypes.LittleEndianStructure):
+    _pack_ = 1
+    _fields_ = [
+        ('length', ctypes.c_int),
+        ('query_id', ctypes.c_longlong),
+        ('flags', ctypes.c_short)
+    ]
+
+
+class ResponseHeader(ctypes.LittleEndianStructure):
+    _pack_ = 1
+    _fields_ = [
+        ('length', ctypes.c_int),
+        ('query_id', ctypes.c_longlong),
+        ('status_code', ctypes.c_int)
+    ]
+
+
 @attr.s
 class Response:
     following = attr.ib(type=list, factory=list)
     protocol_context = attr.ib(type=type(ProtocolContext), default=None)
-    _response_header = None
     _response_class_name = 'Response'
 
     def __attrs_post_init__(self):
         # replace None with empty list
         self.following = self.following or []
 
-    def __build_header(self):
-        if self._response_header is None:
-            fields = [
-                ('length', ctypes.c_int),
-                ('query_id', ctypes.c_longlong),
-            ]
-
-            if self.protocol_context.is_status_flags_supported():
-                fields.append(('flags', ctypes.c_short))
-            else:
-                fields.append(('status_code', ctypes.c_int),)
-
-            self._response_header = type(
-                'ResponseHeader',
-                (ctypes.LittleEndianStructure,),
-                {
-                    '_pack_': 1,
-                    '_fields_': fields,
-                },
-            )
-        return self._response_header
-
     def __parse_header(self, stream):
         init_pos = stream.tell()
-        header_class = self.__build_header()
+
+        if self.protocol_context.is_status_flags_supported():
+            header_class = StatusFlagResponseHeader
+        else:
+            header_class = ResponseHeader
+
         header_len = ctypes.sizeof(header_class)
         header = stream.read_ctype(header_class)
         stream.seek(header_len, SEEK_CUR)
diff --git a/pyignite/stream/binary_stream.py b/pyignite/stream/binary_stream.py
index 57b4b83..3923a3b 100644
--- a/pyignite/stream/binary_stream.py
+++ b/pyignite/stream/binary_stream.py
@@ -23,7 +23,12 @@ READ_FORWARD = 0
 READ_BACKWARD = 1
 
 
-class BinaryStreamBaseMixin:
+class BinaryStreamBase:
+    def __init__(self, client, buf=None):
+        self.client = client
+        self.stream = BytesIO(buf) if buf else BytesIO()
+        self._buffer = None
+
     @property
     def compact_footer(self) -> bool:
         return self.client.compact_footer
@@ -50,10 +55,11 @@ class BinaryStreamBaseMixin:
         else:
             start, end = init_position - ctype_len, init_position
 
-        buf = self.stream.getbuffer()[start:end]
-        return ctype_class.from_buffer_copy(buf)
+        with self.getbuffer()[start:end] as buf:
+            return ctype_class.from_buffer_copy(buf)
 
     def write(self, buf):
+        self._release_buffer()
         return self.stream.write(buf)
 
     def tell(self):
@@ -62,30 +68,39 @@ class BinaryStreamBaseMixin:
     def seek(self, *args, **kwargs):
         return self.stream.seek(*args, **kwargs)
 
+    def getbuffer(self):
+        if self._buffer:
+            return self._buffer
+
+        self._buffer = self.stream.getbuffer()
+        return self._buffer
+
     def getvalue(self):
         return self.stream.getvalue()
 
-    def getbuffer(self):
-        return self.stream.getbuffer()
-
-    def mem_view(self, start=-1, offset=0):
+    def slice(self, start=-1, offset=0):
         start = start if start >= 0 else self.tell()
-        return self.stream.getbuffer()[start:start + offset]
+        with self.getbuffer()[start:start + offset] as buf:
+            return bytes(buf)
 
     def hashcode(self, start, bytes_len):
-        return ignite_utils.hashcode(self.stream.getbuffer()[start:start + bytes_len])
+        with self.getbuffer()[start:start + bytes_len] as buf:
+            return ignite_utils.hashcode(buf)
+
+    def _release_buffer(self):
+        if self._buffer:
+            self._buffer.release()
+            self._buffer = None
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        try:
-            self.stream.close()
-        except BufferError:
-            pass
+        self._release_buffer()
+        self.stream.close()
 
 
-class BinaryStream(BinaryStreamBaseMixin):
+class BinaryStream(BinaryStreamBase):
     """
     Synchronous binary stream.
     """
@@ -94,8 +109,7 @@ class BinaryStream(BinaryStreamBaseMixin):
         :param client: Client instance, required.
         :param buf: Buffer, optional parameter. If not passed, creates empty BytesIO.
         """
-        self.client = client
-        self.stream = BytesIO(buf) if buf else BytesIO()
+        super().__init__(client, buf)
 
     def get_dataclass(self, header):
         result = self.client.query_binary_type(header.type_id, header.schema_id)
@@ -107,7 +121,7 @@ class BinaryStream(BinaryStreamBaseMixin):
         self.client.register_binary_type(*args, **kwargs)
 
 
-class AioBinaryStream(BinaryStreamBaseMixin):
+class AioBinaryStream(BinaryStreamBase):
     """
     Asyncio binary stream.
     """
@@ -118,8 +132,7 @@ class AioBinaryStream(BinaryStreamBaseMixin):
         :param client: AioClient instance, required.
         :param buf: Buffer, optional parameter. If not passed, creates empty BytesIO.
         """
-        self.client = client
-        self.stream = BytesIO(buf) if buf else BytesIO()
+        super().__init__(client, buf)
 
     async def get_dataclass(self, header):
         result = await self.client.query_binary_type(header.type_id, header.schema_id)