You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/07/20 05:24:45 UTC
[ignite-python-thin-client] branch master updated: IGNITE-15102
Implement event handling and monitoring for python thin client - Fixes #46.
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 82f29e2 IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.
82f29e2 is described below
commit 82f29e202ea4526b49721993e6ea356640ef66e6
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Tue Jul 20 08:24:13 2021 +0300
IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.
---
docs/modules.rst | 1 +
...st => pyignite.connection.protocol_context.rst} | 8 +-
docs/source/pyignite.connection.rst | 6 +
...nite.connection.rst => pyignite.monitoring.rst} | 7 +-
docs/source/pyignite.rst | 1 +
pyignite/aio_client.py | 17 +-
pyignite/client.py | 24 +-
pyignite/connection/aio_connection.py | 2 +-
pyignite/connection/connection.py | 44 +-
pyignite/connection/protocol_context.py | 3 +
pyignite/monitoring.py | 457 +++++++++++++++++++++
pyignite/queries/query.py | 27 +-
pyignite/stream/aio_cluster.py | 53 ---
tests/affinity/test_affinity.py | 6 +-
tests/affinity/test_affinity_request_routing.py | 152 ++++---
tests/common/test_query_listener.py | 127 ++++++
tests/custom/test_connection_events.py | 129 ++++++
tests/security/conftest.py | 24 ++
tests/security/test_auth.py | 77 +++-
tests/security/test_ssl.py | 29 +-
20 files changed, 1012 insertions(+), 182 deletions(-)
diff --git a/docs/modules.rst b/docs/modules.rst
index 0cce570..bdeec8e 100644
--- a/docs/modules.rst
+++ b/docs/modules.rst
@@ -31,3 +31,4 @@ of `pyignite`, intended for end users.
datatypes/parsers
datatypes/cache_props
Exceptions <source/pyignite.exceptions>
+ Monitoring and handling events <source/pyignite.monitoring>
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.connection.protocol_context.rst
similarity index 87%
copy from docs/source/pyignite.connection.rst
copy to docs/source/pyignite.connection.protocol_context.rst
index 90c59db..a5298ba 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.connection.protocol_context.rst
@@ -13,10 +13,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite.connection package
+pyignite.connection.protocol_context package
===========================
-.. automodule:: pyignite.connection
- :members:
- :undoc-members:
- :show-inheritance:
+.. automodule:: pyignite.connection.protocol_context
+ :members:
\ No newline at end of file
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.connection.rst
index 90c59db..29c2e57 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.connection.rst
@@ -20,3 +20,9 @@ pyignite.connection package
:members:
:undoc-members:
:show-inheritance:
+
+Submodules
+----------
+
+.. toctree::
+ pyignite.connection.protocol_context
\ No newline at end of file
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.monitoring.rst
similarity index 88%
copy from docs/source/pyignite.connection.rst
copy to docs/source/pyignite.monitoring.rst
index 90c59db..98b137d 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.monitoring.rst
@@ -13,10 +13,9 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite.connection package
+pyignite.monitoring module
===========================
-.. automodule:: pyignite.connection
+.. automodule:: pyignite.monitoring
:members:
- :undoc-members:
- :show-inheritance:
+ :member-order: bysource
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst
index 2e52500..7a0744c 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.rst
@@ -44,4 +44,5 @@ Submodules
pyignite.transaction
pyignite.cursors
pyignite.exceptions
+ pyignite.monitoring
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 0bb2b8c..083c964 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -16,7 +16,7 @@ import asyncio
import random
import sys
from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict, Optional
+from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
@@ -60,7 +60,8 @@ class AioClient(BaseClient):
Asynchronous Client implementation.
"""
- def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
+ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
+ event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.
@@ -71,9 +72,10 @@ class AioClient(BaseClient):
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
- server node, `True` by default.
+ server node, `True` by default,
+ :param event_listeners: (optional) event listeners.
"""
- super().__init__(compact_footer, partition_aware, **kwargs)
+ super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
self._registry_mux = asyncio.Lock()
self._affinity_query_mux = asyncio.Lock()
@@ -99,9 +101,8 @@ class AioClient(BaseClient):
# do not try to open more nodes
self._current_node = i
-
except connection_errors:
- conn.failed = True
+ pass
self._nodes.append(conn)
@@ -301,7 +302,7 @@ class AioClient(BaseClient):
"""
for _ in range(AFFINITY_RETRIES or 1):
result = await cache_get_node_partitions_async(conn, caches)
- if result.status == 0 and result.value['partition_mapping']:
+ if result.status == 0:
break
await asyncio.sleep(AFFINITY_DELAY)
@@ -341,7 +342,7 @@ class AioClient(BaseClient):
asyncio.ensure_future(
asyncio.gather(
- *[conn.reconnect() for conn in self._nodes if not conn.alive],
+ *[node.reconnect() for node in self._nodes if not node.alive],
return_exceptions=True
)
)
diff --git a/pyignite/client.py b/pyignite/client.py
index 6a499a3..e3dd71b 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -44,7 +44,7 @@ from collections import defaultdict, OrderedDict
import random
import re
from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict, Optional
+from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
@@ -66,6 +66,7 @@ from .utils import (
get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
+from .monitoring import _EventListeners
__all__ = ['Client']
@@ -76,7 +77,8 @@ class BaseClient:
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
- def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
+ def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
+ event_listeners: Optional[Sequence] = None, **kwargs):
self._compact_footer = compact_footer
self._partition_aware = partition_aware
self._connection_args = kwargs
@@ -87,6 +89,7 @@ class BaseClient:
self.affinity_version = (0, 0)
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
self._protocol_context = None
+ self._event_listeners = _EventListeners(event_listeners)
@property
def protocol_context(self):
@@ -338,7 +341,8 @@ class Client(BaseClient):
Synchronous Client implementation.
"""
- def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
+ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
+ event_listeners: Optional[Sequence] = None, **kwargs):
"""
Initialize client.
@@ -349,9 +353,10 @@ class Client(BaseClient):
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
:param partition_aware: (optional) try to calculate the exact data
placement from the key before to issue the key operation to the
- server node, `True` by default.
+ server node, `True` by default,
+ :param event_listeners: (optional) event listeners.
"""
- super().__init__(compact_footer, partition_aware, **kwargs)
+ super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
def connect(self, *args):
"""
@@ -382,7 +387,6 @@ class Client(BaseClient):
self._current_node = i
except connection_errors:
- conn.failed = True
if self.partition_aware:
# schedule the reconnection
conn.reconnect()
@@ -565,7 +569,7 @@ class Client(BaseClient):
"""
for _ in range(AFFINITY_RETRIES or 1):
result = cache_get_node_partitions(conn, caches)
- if result.status == 0 and result.value['partition_mapping']:
+ if result.status == 0:
break
time.sleep(AFFINITY_DELAY)
@@ -608,9 +612,9 @@ class Client(BaseClient):
self._update_affinity(full_affinity)
- for conn in self._nodes:
- if not conn.alive:
- conn.reconnect()
+ for node in self._nodes:
+ if not node.alive:
+ node.reconnect()
c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index c5fa24d..89de49d 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -190,10 +190,10 @@ class AioConnection(BaseConnection):
self._on_handshake_fail(e)
raise e
except Exception as e:
+ self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
- self._on_handshake_fail(e)
raise e
self._on_handshake_success(result)
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index ae5587a..2b9970a 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -99,7 +99,9 @@ class BaseConnection:
def _on_handshake_start(self):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
- self.host, self.port, self.client.protocol_context)
+ self.host, self.port, self.protocol_context)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)
def _on_handshake_success(self, result):
features = BitmaskFeature.from_array(result.get('features', None))
@@ -109,24 +111,45 @@ class BaseConnection:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
- self.host, self.port, self.uuid, self.client.protocol_context)
+ self.host, self.port, self.uuid, self.protocol_context)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)
def _on_handshake_fail(self, err):
+ self.failed = True
+
if isinstance(err, AuthenticationError):
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
self.host, self.port, err)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
else:
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
"with protocol context %s failed: %s",
- self.host, self.port, self.client.protocol_context, err, exc_info=True)
+ self.host, self.port, self.protocol_context, err, exc_info=True)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)
def _on_connection_lost(self, err=None, expected=False):
- if expected and logger.isEnabledFor(logging.DEBUG):
- logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
- self.host, self.port, self.uuid)
+ if expected:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
+ self.host, self.port, self.uuid)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
else:
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
self.host, self.port, self.uuid, err)
+ if self._enabled_connection_listener:
+ self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)
+
+ @property
+ def _enabled_connection_listener(self):
+ return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener
+
+ @property
+ def _connection_listener(self):
+ return self.client._event_listeners
class Connection(BaseConnection):
@@ -216,10 +239,10 @@ class Connection(BaseConnection):
self._on_handshake_fail(e)
raise e
except Exception as e:
+ self._on_handshake_fail(e)
# restore undefined protocol version
if detecting_protocol:
self.client.protocol_context = None
- self._on_handshake_fail(e)
raise e
self._on_handshake_success(result)
@@ -260,7 +283,7 @@ class Connection(BaseConnection):
if self.alive:
return
- self.close()
+ self.close(on_reconnect=True)
# connect and silence the connection errors
try:
@@ -352,7 +375,7 @@ class Connection(BaseConnection):
return data
- def close(self):
+ def close(self, on_reconnect=False):
"""
Try to mark socket closed, then unlink it. This is recommended but
not required, since sockets are automatically closed when
@@ -364,5 +387,6 @@ class Connection(BaseConnection):
self._socket.close()
except connection_errors:
pass
- self._on_connection_lost(expected=True)
+ if not on_reconnect and not self.failed:
+ self._on_connection_lost(expected=True)
self._socket = None
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index 58f509e..ba6d9e4 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -44,6 +44,9 @@ class ProtocolContext:
if not self.is_feature_flags_supported():
self._features = None
+ def copy(self):
+ return ProtocolContext(self.version, self.features)
+
@property
def version(self):
return getattr(self, '_version', None)
diff --git a/pyignite/monitoring.py b/pyignite/monitoring.py
new file mode 100644
index 0000000..9bbfd20
--- /dev/null
+++ b/pyignite/monitoring.py
@@ -0,0 +1,457 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Tools to monitor client's events.
+
+For example, a simple query logger might be implemented like this::
+
+ import logging
+
+ from pyignite import monitoring
+
+ class QueryLogger(monitoring.QueryEventListener):
+
+ def on_query_start(self, event):
+ logging.info(f"Query {event.op_name} with query id "
+ f"{event.query_id} started on server "
+ f"{event.host}:{event.port}")
+
+ def on_query_fail(self, event):
+ logging.info(f"Query {event.op_name} with query id "
+ f"{event.query_id} on server "
+ f"{event.host}:{event.port} "
+ f"failed in {event.duration}ms "
+ f"with error {event.error_msg}")
+
+ def on_query_success(self, event):
+ logging.info(f"Query {event.op_name} with query id "
+ f"{event.query_id} on server " \
+ f"{event.host}:{event.port} " \
+ f"succeeded in {event.duration}ms")
+
+:class:`~ConnectionEventListener` is also available.
+
+Event listeners can be registered by passing parameter to :class:`~pyignite.client.Client` or
+:class:`~pyignite.aio_client.AioClient` constructor::
+
+ client = Client(event_listeners=[QueryLogger()])
+ with client.connect('127.0.0.1', 10800):
+ ....
+
+.. note:: Events are delivered **synchronously**. Application threads block
+ waiting for event handlers. Care must be taken to ensure that your event handlers are efficient
+ enough to not adversely affect overall application performance.
+
+.. note:: Debug logging is also available, standard ``logging`` is used. Just set ``DEBUG`` level to
+ *pyignite* logger.
+|
+|
+"""
+from typing import Optional, Sequence
+
+
+class _BaseEvent:
+ def __init__(self, **kwargs):
+ if kwargs:
+ for k, v in kwargs.items():
+ object.__setattr__(self, k, v)
+
+ def __setattr__(self, name, value):
+ raise TypeError(f'{self.__class__.__name__} is immutable')
+
+ def __repr__(self):
+ pass
+
+
+class _ConnectionEvent(_BaseEvent):
+ __slots__ = ('host', 'port')
+ host: str
+ port: int
+
+ def __init__(self, host, port, **kwargs):
+ super().__init__(host=host, port=port, **kwargs)
+
+
+class _HandshakeEvent(_ConnectionEvent):
+ __slots__ = ('protocol_context',)
+ protocol_context: Optional['ProtocolContext']
+
+ def __init__(self, host, port, protocol_context=None, **kwargs):
+ super().__init__(host, port, protocol_context=protocol_context.copy() if protocol_context else None, **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"protocol_context={self.protocol_context})"
+
+
+class HandshakeStartEvent(_HandshakeEvent):
+ """
+ Published when a handshake started.
+
+ :ivar host: Address of the node to connect,
+ :ivar port: Port number of the node to connect,
+ :ivar protocol_context: Client's protocol context.
+ """
+ def __init__(self, host, port, protocol_context=None, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host, port, protocol_context, **kwargs)
+
+
+class HandshakeFailedEvent(_HandshakeEvent):
+ """
+ Published when a handshake failed.
+
+ :ivar host: Address of the node to connect,
+ :ivar port: Port number of the node to connect,
+ :ivar protocol_context: Client's protocol context,
+ :ivar error_msg: Error message.
+ """
+ __slots__ = ('error_msg',)
+ error_msg: str
+
+ def __init__(self, host, port, protocol_context=None, err=None, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host, port, protocol_context, error_msg=repr(err) if err else '', **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"protocol_context={self.protocol_context}, error_msg={self.error_msg})"
+
+
+class AuthenticationFailedEvent(HandshakeFailedEvent):
+ """
+ Published when an authentication is failed.
+
+ :ivar host: Address of the node to connect,
+ :ivar port: Port number of the node to connect,
+ :ivar protocol_context: Client protocol context,
+ :ivar error_msg: Error message.
+ """
+ pass
+
+
+class HandshakeSuccessEvent(_HandshakeEvent):
+ """
+ Published when a handshake succeeded.
+
+ :ivar host: Address of the node to connect,
+ :ivar port: Port number of the node to connect,
+ :ivar protocol_context: Client's protocol context,
+ :ivar node_uuid: Node's uuid, string.
+ """
+ __slots__ = ('node_uuid',)
+ node_uuid: str
+
+ def __init__(self, host, port, protocol_context, node_uuid, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host, port, protocol_context, node_uuid=str(node_uuid) if node_uuid else '', **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"node_uuid={self.node_uuid}, protocol_context={self.protocol_context})"
+
+
+class ConnectionClosedEvent(_ConnectionEvent):
+ """
+ Published when a connection to the node is expectedly closed.
+
+ :ivar host: Address of node to connect,
+ :ivar port: Port number of node to connect,
+ :ivar node_uuid: Node uuid, string.
+ """
+ __slots__ = ('node_uuid',)
+ node_uuid: str
+
+ def __init__(self, host, port, node_uuid, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host, port, node_uuid=str(node_uuid) if node_uuid else '', **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, node_uuid={self.node_uuid})"
+
+
+class ConnectionLostEvent(ConnectionClosedEvent):
+ """
+ Published when a connection to the node is lost.
+
+ :ivar host: Address of the node to connect,
+ :ivar port: Port number of the node to connect,
+ :ivar node_uuid: Node's uuid, string,
+ :ivar error_msg: Error message.
+ """
+ __slots__ = ('error_msg',)
+ node_uuid: str
+ error_msg: str
+
+ def __init__(self, host, port, node_uuid, err, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host, port, node_uuid, error_msg=repr(err) if err else '', **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"node_uuid={self.node_uuid}, error_msg={self.error_msg})"
+
+
+class _EventListener:
+ pass
+
+
+class ConnectionEventListener(_EventListener):
+ """
+ Base class for connection event listeners.
+ """
+ def on_handshake_start(self, event: HandshakeStartEvent):
+ """
+ Handle handshake start event.
+
+ :param event: Instance of :class:`HandshakeStartEvent`.
+ """
+ pass
+
+ def on_handshake_success(self, event: HandshakeSuccessEvent):
+ """
+ Handle handshake success event.
+
+ :param event: Instance of :class:`HandshakeSuccessEvent`.
+ """
+ pass
+
+ def on_handshake_fail(self, event: HandshakeFailedEvent):
+ """
+ Handle handshake failed event.
+
+ :param event: Instance of :class:`HandshakeFailedEvent`.
+ """
+ pass
+
+ def on_authentication_fail(self, event: AuthenticationFailedEvent):
+ """
+ Handle authentication failed event.
+
+ :param event: Instance of :class:`AuthenticationFailedEvent`.
+ """
+ pass
+
+ def on_connection_closed(self, event: ConnectionClosedEvent):
+ """
+ Handle connection closed event.
+
+ :param event: Instance of :class:`ConnectionClosedEvent`.
+ """
+ pass
+
+ def on_connection_lost(self, event: ConnectionLostEvent):
+ """
+ Handle connection lost event.
+
+ :param event: Instance of :class:`ConnectionLostEvent`.
+ """
+ pass
+
+
+class _QueryEvent(_BaseEvent):
+ __slots__ = ('host', 'port', 'node_uuid', 'query_id', 'op_code', 'op_name')
+ host: str
+ port: int
+ node_uuid: str
+ query_id: int
+ op_code: int
+ op_name: str
+
+ def __init__(self, host, port, node_uuid, query_id, op_code, op_name, **kwargs):
+ """
+ This class is not supposed to be constructed by user.
+ """
+ super().__init__(host=host, port=port, node_uuid=str(node_uuid) if node_uuid else '',
+ query_id=query_id, op_code=op_code, op_name=op_name, **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \
+ f"op_code={self.op_code}, op_name={self.op_name})"
+
+
+class QueryStartEvent(_QueryEvent):
+ """
+ Published when a client's query started.
+
+ :ivar host: Address of the node on which the query is executed,
+ :ivar port: Port number of the node on which the query is executed,
+ :ivar node_uuid: Node's uuid, string,
+ :ivar query_id: Query's id,
+ :ivar op_code: Operation's id,
+ :ivar op_name: Operation's name.
+ """
+ pass
+
+
+class QuerySuccessEvent(_QueryEvent):
+ """
+ Published when a client's query finished successfully.
+
+ :ivar host: Address of the node on which the query is executed,
+ :ivar port: Port number of the node on which the query is executed,
+ :ivar node_uuid: Node's uuid, string,
+ :ivar query_id: Query's id,
+ :ivar op_code: Operation's id,
+ :ivar op_name: Operation's name,
+ :ivar duration: Query's duration in milliseconds.
+ """
+ __slots__ = ('duration', )
+ duration: int
+
+ def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, **kwargs):
+ super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \
+ f"op_code={self.op_code}, op_name={self.op_name}, duration={self.duration})"
+
+
+class QueryFailEvent(_QueryEvent):
+ """
+ Published when a client's query failed.
+
+ :ivar host: Address of the node on which the query is executed,
+ :ivar port: Port number of the node on which the query is executed,
+ :ivar node_uuid: Node's uuid, string,
+ :ivar query_id: Query's id,
+ :ivar op_code: Operation's id,
+ :ivar op_name: Operation's name,
+ :ivar duration: Query's duration in milliseconds,
+ :ivar error_msg: Error message.
+ """
+ __slots__ = ('duration', 'err_msg')
+ duration: int
+ err_msg: str
+
+ def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, err, **kwargs):
+ super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration,
+ err_msg=repr(err) if err else '', **kwargs)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+ f"node_uuid={self.node_uuid}, query_id={self.query_id}, op_code={self.op_code}, " \
+ f"op_name={self.op_name}, duration={self.duration}, err_msg={self.err_msg})"
+
+
+class QueryEventListener(_EventListener):
+ """
+ Base class for query event listeners.
+ """
+ def on_query_start(self, event: QueryStartEvent):
+ """
+ Handle query start event.
+
+ :param event: Instance of :class:`QueryStartEvent`.
+ """
+ pass
+
+ def on_query_success(self, event: QuerySuccessEvent):
+ """
+ Handle query success event.
+
+ :param event: Instance of :class:`QuerySuccessEvent`.
+ """
+ pass
+
+ def on_query_fail(self, event: QueryFailEvent):
+ """
+ Handle query fail event.
+
+ :param event: Instance of :class:`QueryFailEvent`.
+ """
+ pass
+
+
+class _EventListeners:
+ def __init__(self, listeners: Optional[Sequence]):
+ self.__connection_listeners = []
+ self.__query_listeners = []
+ if listeners:
+ for listener in listeners:
+ if isinstance(listener, ConnectionEventListener):
+ self.__connection_listeners.append(listener)
+ elif isinstance(listener, QueryEventListener):
+ self.__query_listeners.append(listener)
+
+ @property
+ def enabled_connection_listener(self):
+ return bool(self.__connection_listeners)
+
+ @property
+ def enabled_query_listener(self):
+ return bool(self.__query_listeners)
+
+ def publish_handshake_start(self, host, port, protocol_context):
+ evt = HandshakeStartEvent(host, port, protocol_context)
+ self.__publish_connection_events(lambda listener: listener.on_handshake_start(evt))
+
+ def publish_handshake_success(self, host, port, protocol_context, node_uuid):
+ evt = HandshakeSuccessEvent(host, port, protocol_context, node_uuid)
+ self.__publish_connection_events(lambda listener: listener.on_handshake_success(evt))
+
+ def publish_handshake_fail(self, host, port, protocol_context, err):
+ evt = HandshakeFailedEvent(host, port, protocol_context, err)
+ self.__publish_connection_events(lambda listener: listener.on_handshake_fail(evt))
+
+ def publish_authentication_fail(self, host, port, protocol_context, err):
+ evt = AuthenticationFailedEvent(host, port, protocol_context, err)
+ self.__publish_connection_events(lambda listener: listener.on_authentication_fail(evt))
+
+ def publish_connection_closed(self, host, port, node_uuid):
+ evt = ConnectionClosedEvent(host, port, node_uuid)
+ self.__publish_connection_events(lambda listener: listener.on_connection_closed(evt))
+
+ def publish_connection_lost(self, host, port, node_uuid, err):
+ evt = ConnectionLostEvent(host, port, node_uuid, err)
+ self.__publish_connection_events(lambda listener: listener.on_connection_lost(evt))
+
+ def publish_query_start(self, host, port, node_uuid, query_id, op_code, op_name):
+ evt = QueryStartEvent(host, port, node_uuid, query_id, op_code, op_name)
+ self.__publish_query_events(lambda listener: listener.on_query_start(evt))
+
+ def publish_query_success(self, host, port, node_uuid, query_id, op_code, op_name, duration):
+ evt = QuerySuccessEvent(host, port, node_uuid, query_id, op_code, op_name, duration)
+ self.__publish_query_events(lambda listener: listener.on_query_success(evt))
+
+ def publish_query_fail(self, host, port, node_uuid, query_id, op_code, op_name, duration, err):
+ evt = QueryFailEvent(host, port, node_uuid, query_id, op_code, op_name, duration, err)
+ self.__publish_query_events(lambda listener: listener.on_query_fail(evt))
+
+ def __publish_connection_events(self, callback):
+ try:
+ for listener in self.__connection_listeners:
+ callback(listener)
+ except: # noqa: 13
+ pass
+
+ def __publish_query_events(self, callback):
+ try:
+ for listener in self.__query_listeners:
+ callback(listener)
+ except: # noqa: 13
+ pass
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 89c354e..c141b26 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -227,12 +227,25 @@ class Query:
# build result
return APIResult(response)
+ @staticmethod
+ def _enabled_query_listener(conn):
+ client = conn.client
+ return client._event_listeners and client._event_listeners.enabled_query_listener
+
+ @staticmethod
+ def _event_listener(conn):
+ return conn.client._event_listeners
+
def _on_query_started(self, conn):
+ self._start_ts = time.monotonic()
if logger.isEnabledFor(logging.DEBUG):
- self._start_ts = time.monotonic()
logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)",
self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid)
+ if self._enabled_query_listener(conn):
+ self._event_listener(conn).publish_query_start(conn.host, conn.port, conn.uuid, self.query_id,
+ self.op_code, _get_op_code_name(self.op_code))
+
def _on_query_finished(self, conn, result=None, err=None):
if logger.isEnabledFor(logging.DEBUG):
dur_ms = _sec_to_millis(time.monotonic() - self._start_ts)
@@ -240,12 +253,20 @@ class Query:
err = result.message
if err:
logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
- "in %.3f ms: %s", self.query_id, _get_op_code_name(self.op_code),
+ "in %d ms: %s", self.query_id, _get_op_code_name(self.op_code),
conn.host, conn.port, conn.uuid, dur_ms, err)
+ if self._enabled_query_listener(conn):
+ self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id,
+ self.op_code, _get_op_code_name(self.op_code),
+ dur_ms, err)
else:
logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
- "successfully in %.3f ms", self.query_id, _get_op_code_name(self.op_code),
+ "successfully in %d ms", self.query_id, _get_op_code_name(self.op_code),
conn.host, conn.port, conn.uuid, dur_ms)
+ if self._enabled_query_listener(conn):
+ self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id,
+ self.op_code, _get_op_code_name(self.op_code),
+ dur_ms)
class ConfigQuery(Query):
diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py
deleted file mode 100644
index 8a2f98e..0000000
--- a/pyignite/stream/aio_cluster.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-This module contains `AioCluster` that lets you get info and change state of the
-whole cluster.
-"""
-from pyignite import AioClient
-from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
-
-
-class AioCluster:
- """
- Ignite cluster abstraction. Users should never use this class directly,
- but construct its instances with
- :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
- """
-
- def __init__(self, client: 'AioClient'):
- self._client = client
-
- async def get_state(self):
- """
- Gets current cluster state.
-
- :return: Current cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
- """
- return await cluster_get_state_async(await self._client.random_node())
-
- async def set_state(self, state):
- """
- Changes current cluster state to the given.
-
- Note: Deactivation clears in-memory caches (without persistence)
- including the system caches.
-
- :param state: New cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
- """
- return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py
index 3097991..c9a6b60 100644
--- a/tests/affinity/test_affinity.py
+++ b/tests/affinity/test_affinity.py
@@ -36,7 +36,6 @@ from tests.util import wait_for_condition, wait_for_condition_async
def test_get_node_partitions(client, caches):
cache_ids = [cache.cache_id for cache in caches]
- __wait_for_ready_affinity(client, cache_ids)
mappings = __get_mappings(client, cache_ids)
__check_mappings(mappings, cache_ids)
@@ -44,7 +43,6 @@ def test_get_node_partitions(client, caches):
@pytest.mark.asyncio
async def test_get_node_partitions_async(async_client, async_caches):
cache_ids = [cache.cache_id for cache in async_caches]
- await __wait_for_ready_affinity(async_client, cache_ids)
mappings = await __get_mappings(async_client, cache_ids)
__check_mappings(mappings, cache_ids)
@@ -157,6 +155,7 @@ def __create_caches_fixture(client):
caches = []
try:
caches = generate_caches()
+ __wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
yield caches
finally:
for cache in caches:
@@ -166,6 +165,7 @@ def __create_caches_fixture(client):
caches = []
try:
caches = await generate_caches()
+ await __wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
yield caches
finally:
await asyncio.gather(*[cache.destroy() for cache in caches])
@@ -180,6 +180,7 @@ def cache(client):
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
try:
+ __wait_for_ready_affinity(client, [cache.cache_id])
yield cache
finally:
cache.destroy()
@@ -192,6 +193,7 @@ async def async_cache(async_client):
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
try:
+ await __wait_for_ready_affinity(async_client, [cache.cache_id])
yield cache
finally:
await cache.destroy()
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 0d0ec24..b73eff3 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -22,11 +22,10 @@ import pytest
from pyignite import GenericObjectMeta, AioClient, Client
from pyignite.aio_cache import AioCache
-from pyignite.connection import Connection, AioConnection
-from pyignite.constants import PROTOCOL_BYTE_ORDER
from pyignite.datatypes import String, LongObject
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import PROP_NAME, PROP_BACKUPS_NUMBER, PROP_CACHE_KEY_CONFIGURATION, PROP_CACHE_MODE
+from pyignite.monitoring import QueryEventListener
from tests.util import wait_for_condition, wait_for_condition_async, start_ignite, kill_process_tree
try:
@@ -35,41 +34,37 @@ except ImportError:
from async_generator import asynccontextmanager
requests = deque()
-old_send = Connection.send
-old_send_async = AioConnection._send
-def patched_send(self, *args, **kwargs):
- """Patched send function that push to queue idx of server to which request is routed."""
- buf = args[0]
- if buf and len(buf) >= 6:
- op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
- # Filter only caches operation.
- if 1000 <= op_code < 1100:
- requests.append(self.port % 100)
- return old_send(self, *args, **kwargs)
+class QueryRouteListener(QueryEventListener):
+ def on_query_start(self, event):
+ if 1000 <= event.op_code < 1100:
+ requests.append(event.port % 100)
-async def patched_send_async(self, *args, **kwargs):
- """Patched send function that push to queue idx of server to which request is routed."""
- buf = args[1]
- if buf and len(buf) >= 6:
- op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
- # Filter only caches operation.
- if 1000 <= op_code < 1100:
- requests.append(self.port % 100)
- return await old_send_async(self, *args, **kwargs)
+client_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)]
-def setup_function():
- requests.clear()
- Connection.send = patched_send
- AioConnection._send = patched_send_async
+@pytest.fixture
+def client():
+ client = Client(partition_aware=True, event_listeners=[QueryRouteListener()])
+ try:
+ client.connect(client_connection_string)
+ yield client
+ finally:
+ requests.clear()
+ client.close()
-def teardown_function():
- Connection.send = old_send
- AioConnection.send = old_send_async
+@pytest.fixture
+async def async_client(event_loop):
+ client = AioClient(partition_aware=True, event_listeners=[QueryRouteListener()])
+ try:
+ await client.connect(client_connection_string)
+ yield client
+ finally:
+ requests.clear()
+ await client.close()
def wait_for_affinity_distribution(cache, key, node_idx, timeout=30):
@@ -112,7 +107,8 @@ async def wait_for_affinity_distribution_async(cache, key, node_idx, timeout=30)
@pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)])
@pytest.mark.parametrize("backups", [0, 1, 2, 3])
-def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups, client):
+def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups,
+ client):
cache = client.get_or_create_cache({
PROP_NAME: request.node.name + str(backups),
PROP_BACKUPS_NUMBER: backups,
@@ -210,47 +206,24 @@ def test_cache_operation_on_custom_affinity_key_routes_request_to_primary_node(r
assert requests.pop() == grid_idx
-client_routed_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)]
-
-
@pytest.fixture
-def client_routed():
- client = Client(partition_aware=True)
- try:
- client.connect(client_routed_connection_string)
- yield client
- finally:
- client.close()
-
-
-@pytest.fixture
-def client_routed_cache(client_routed, request):
- yield client_routed.get_or_create_cache(request.node.name)
+def client_cache(client, request):
+ yield client.get_or_create_cache(request.node.name)
@pytest.fixture
-async def async_client_routed(event_loop):
- client = AioClient(partition_aware=True)
- try:
- await client.connect(client_routed_connection_string)
- yield client
- finally:
- await client.close()
-
-
-@pytest.fixture
-async def async_client_routed_cache(async_client_routed, request):
- cache = await async_client_routed.get_or_create_cache(request.node.name)
+async def async_client_cache(async_client, request):
+ cache = await async_client.get_or_create_cache(request.node.name)
yield cache
-def test_cache_operation_routed_to_new_cluster_node(client_routed_cache):
- __perform_cache_operation_routed_to_new_node(client_routed_cache)
+def test_cache_operation_routed_to_new_cluster_node(client_cache):
+ __perform_cache_operation_routed_to_new_node(client_cache)
@pytest.mark.asyncio
-async def test_cache_operation_routed_to_new_cluster_node_async(async_client_routed_cache):
- await __perform_cache_operation_routed_to_new_node(async_client_routed_cache)
+async def test_cache_operation_routed_to_new_cluster_node_async(async_client_cache):
+ await __perform_cache_operation_routed_to_new_node(async_client_cache)
def __perform_cache_operation_routed_to_new_node(cache):
@@ -328,6 +301,55 @@ async def test_replicated_cache_operation_routed_to_random_node_async(async_repl
await verify_random_node(async_replicated_cache)
+def test_replicated_cache_operation_not_routed_to_failed_node(replicated_cache):
+ srv = start_ignite(idx=4)
+ try:
+ while True:
+ replicated_cache.put(1, 1)
+
+ if requests.pop() == 4:
+ break
+
+ kill_process_tree(srv.pid)
+
+ num_failures = 0
+ for i in range(100):
+ # Request may fail one time, because query can be requested before affinity update or connection
+ # lost will be detected.
+ try:
+ replicated_cache.put(1, 1)
+ except: # noqa 13
+ num_failures += 1
+ assert num_failures <= 1, "Expected no more than 1 failure."
+ finally:
+ kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_replicated_cache_operation_not_routed_to_failed_node_async(async_replicated_cache):
+ srv = start_ignite(idx=4)
+ try:
+ while True:
+ await async_replicated_cache.put(1, 1)
+
+ if requests.pop() == 4:
+ break
+
+ kill_process_tree(srv.pid)
+
+ num_failures = 0
+ for i in range(100):
+ # Request may fail one time, because query can be requested before affinity update or connection
+ # lost will be detected.
+ try:
+ await async_replicated_cache.put(1, 1)
+ except: # noqa 13
+ num_failures += 1
+ assert num_failures <= 1, "Expected no more than 1 failure."
+ finally:
+ kill_process_tree(srv.pid)
+
+
def verify_random_node(cache):
key = 1
@@ -423,8 +445,8 @@ async def test_new_registered_cache_affinity_async(async_client):
assert requests.pop() == 3
-def test_all_registered_cache_updated_on_new_server(client_routed):
- with create_caches(client_routed) as caches:
+def test_all_registered_cache_updated_on_new_server(client):
+ with create_caches(client) as caches:
key = 12
test_cache = random.choice(caches)
wait_for_affinity_distribution(test_cache, key, 3)
@@ -444,8 +466,8 @@ def test_all_registered_cache_updated_on_new_server(client_routed):
@pytest.mark.asyncio
-async def test_all_registered_cache_updated_on_new_server_async(async_client_routed):
- async with create_caches_async(async_client_routed) as caches:
+async def test_all_registered_cache_updated_on_new_server_async(async_client):
+ async with create_caches_async(async_client) as caches:
key = 12
test_cache = random.choice(caches)
await wait_for_affinity_distribution_async(test_cache, key, 3)
diff --git a/tests/common/test_query_listener.py b/tests/common/test_query_listener.py
new file mode 100644
index 0000000..afff542
--- /dev/null
+++ b/tests/common/test_query_listener.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.exceptions import CacheError
+from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent
+from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE
+
+events = []
+
+
+class QueryRouteListener(QueryEventListener):
+ def on_query_start(self, event):
+ if event.op_code != OP_CACHE_PARTITIONS:
+ events.append(event)
+
+ def on_query_fail(self, event):
+ if event.op_code != OP_CACHE_PARTITIONS:
+ events.append(event)
+
+ def on_query_success(self, event):
+ if event.op_code != OP_CACHE_PARTITIONS:
+ events.append(event)
+
+
+@pytest.fixture
+def client():
+ client = Client(event_listeners=[QueryRouteListener()])
+ try:
+ client.connect('127.0.0.1', 10801)
+ yield client
+ finally:
+ client.close()
+ events.clear()
+
+
+@pytest.fixture
+async def async_client(event_loop):
+ client = AioClient(event_listeners=[QueryRouteListener()])
+ try:
+ await client.connect('127.0.0.1', 10801)
+ yield client
+ finally:
+ await client.close()
+ events.clear()
+
+
+def test_query_fail_events(request, client):
+ with pytest.raises(CacheError):
+ cache = client.get_cache(request.node.name)
+ cache.put(1, 1)
+
+ __assert_fail_events(client)
+
+
+@pytest.mark.asyncio
+async def test_query_fail_events_async(request, async_client):
+ with pytest.raises(CacheError):
+ cache = await async_client.get_cache(request.node.name)
+ await cache.put(1, 1)
+
+ __assert_fail_events(async_client)
+
+
+def __assert_fail_events(client):
+ assert len(events) == 2
+ conn = client._nodes[0]
+ for ev in events:
+ if isinstance(ev, QueryStartEvent):
+ assert ev.op_code == OP_CACHE_PUT
+ assert ev.op_name == 'OP_CACHE_PUT'
+ assert ev.host == conn.host
+ assert ev.port == conn.port
+ assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+
+ if isinstance(ev, QueryFailEvent):
+ assert ev.op_code == OP_CACHE_PUT
+ assert ev.op_name == 'OP_CACHE_PUT'
+ assert ev.host == conn.host
+ assert ev.port == conn.port
+ assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+ assert 'Cache does not exist' in ev.err_msg
+ assert ev.duration > 0
+
+
+def test_query_success_events(client):
+ client.get_cluster().get_state()
+ __assert_success_events(client)
+
+
+@pytest.mark.asyncio
+async def test_query_success_events_async(async_client):
+ await async_client.get_cluster().get_state()
+ __assert_success_events(async_client)
+
+
+def __assert_success_events(client):
+ assert len(events) == 2
+ conn = client._nodes[0]
+ for ev in events:
+ if isinstance(ev, QueryStartEvent):
+ assert ev.op_code == OP_CLUSTER_GET_STATE
+ assert ev.op_name == 'OP_CLUSTER_GET_STATE'
+ assert ev.host == conn.host
+ assert ev.port == conn.port
+ assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+
+ if isinstance(ev, QuerySuccessEvent):
+ assert ev.op_code == OP_CLUSTER_GET_STATE
+ assert ev.op_name == 'OP_CLUSTER_GET_STATE'
+ assert ev.host == conn.host
+ assert ev.port == conn.port
+ assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+ assert ev.duration > 0
diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py
new file mode 100644
index 0000000..bee9395
--- /dev/null
+++ b/tests/custom/test_connection_events.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import random
+
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.monitoring import ConnectionEventListener, ConnectionLostEvent, ConnectionClosedEvent, \
+ HandshakeSuccessEvent, HandshakeFailedEvent, HandshakeStartEvent
+
+from tests.util import start_ignite_gen, kill_process_tree
+
+
+@pytest.fixture(autouse=True)
+def server1():
+ yield from start_ignite_gen(idx=1)
+
+
+@pytest.fixture(autouse=True)
+def server2():
+ yield from start_ignite_gen(idx=2)
+
+
+events = []
+
+
+def teardown_function():
+ events.clear()
+
+
+class RecordingConnectionEventListener(ConnectionEventListener):
+ def on_handshake_start(self, event):
+ events.append(event)
+
+ def on_handshake_success(self, event):
+ events.append(event)
+
+ def on_handshake_fail(self, event):
+ events.append(event)
+
+ def on_authentication_fail(self, event):
+ events.append(event)
+
+ def on_connection_closed(self, event):
+ events.append(event)
+
+ def on_connection_lost(self, event):
+ events.append(event)
+
+
+def test_events(request, server2):
+ client = Client(event_listeners=[RecordingConnectionEventListener()])
+ with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
+ protocol_context = client.protocol_context
+ nodes = {conn.port: conn for conn in client._nodes}
+ cache = client.get_or_create_cache(request.node.name)
+ kill_process_tree(server2.pid)
+
+ while True:
+ try:
+ cache.put(random.randint(0, 1000), 1)
+ except: # noqa 13
+ pass
+
+ if any(isinstance(e, ConnectionLostEvent) for e in events):
+ break
+
+ __assert_events(nodes, protocol_context)
+
+
+@pytest.mark.asyncio
+async def test_events_async(request, server2):
+ client = AioClient(event_listeners=[RecordingConnectionEventListener()])
+ async with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
+ protocol_context = client.protocol_context
+ nodes = {conn.port: conn for conn in client._nodes}
+ cache = await client.get_or_create_cache(request.node.name)
+ kill_process_tree(server2.pid)
+
+ while True:
+ try:
+ await cache.put(random.randint(0, 1000), 1)
+ except: # noqa 13
+ pass
+
+ if any(isinstance(e, ConnectionLostEvent) for e in events):
+ break
+
+ __assert_events(nodes, protocol_context)
+
+
+def __assert_events(nodes, protocol_context):
+ assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1
+ # ConnectionLostEvent is a subclass of ConnectionClosedEvent
+ assert len([e for e in events if type(e) == ConnectionClosedEvent]) == 1
+ assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2
+
+ for ev in events:
+ assert ev.host == '127.0.0.1'
+ if isinstance(ev, ConnectionLostEvent):
+ assert ev.port == 10802
+ assert ev.node_uuid == str(nodes[ev.port].uuid)
+ assert ev.error_msg
+ elif isinstance(ev, HandshakeStartEvent):
+ assert ev.protocol_context == protocol_context
+ assert ev.port in {10801, 10802}
+ elif isinstance(ev, HandshakeFailedEvent):
+ assert ev.port == 10802
+ assert ev.protocol_context == protocol_context
+ assert ev.error_msg
+ elif isinstance(ev, HandshakeSuccessEvent):
+ assert ev.port in {10801, 10802}
+ assert ev.node_uuid == str(nodes[ev.port].uuid)
+ assert ev.protocol_context == protocol_context
+ elif isinstance(ev, ConnectionClosedEvent):
+ assert ev.port == 10801
+ assert ev.node_uuid == str(nodes[ev.port].uuid)
diff --git a/tests/security/conftest.py b/tests/security/conftest.py
index d5de5a1..8845c31 100644
--- a/tests/security/conftest.py
+++ b/tests/security/conftest.py
@@ -16,6 +16,7 @@ import os
import pytest
+from pyignite import monitoring
from tests.util import get_test_dir
@@ -47,3 +48,26 @@ def __create_ssl_param(with_password=False):
'ssl_certfile': cert,
'ssl_ca_certfile': cert
}
+
+
+class AccumulatingConnectionListener(monitoring.ConnectionEventListener):
+ def __init__(self):
+ self.events = []
+
+ def on_handshake_start(self, event):
+ self.events.append(event)
+
+ def on_handshake_success(self, event):
+ self.events.append(event)
+
+ def on_handshake_fail(self, event):
+ self.events.append(event)
+
+ def on_authentication_fail(self, event):
+ self.events.append(event)
+
+ def on_connection_closed(self, event):
+ self.events.append(event)
+
+ def on_connection_lost(self, event):
+ self.events.append(event)
diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py
index 3586c91..503cf88 100644
--- a/tests/security/test_auth.py
+++ b/tests/security/test_auth.py
@@ -18,7 +18,11 @@ import re
import pytest
from pyignite import Client, AioClient
+from pyignite.monitoring import (
+ HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent, AuthenticationFailedEvent
+)
from pyignite.exceptions import AuthenticationError
+from tests.security.conftest import AccumulatingConnectionListener
from tests.util import start_ignite_gen, clear_ignite_work_dir
DEFAULT_IGNITE_USERNAME = 'ignite'
@@ -44,32 +48,58 @@ def cleanup():
def test_auth_success(with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
- client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
+ listener = AccumulatingConnectionListener()
+ client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD,
+ event_listeners=[listener], **ssl_params)
with caplog.at_level(logger='pyignite', level=logging.DEBUG):
with client.connect("127.0.0.1", 10801):
assert all(node.alive for node in client._nodes)
+ conn = client._nodes[0]
- __assert_successful_connect_log(caplog)
+ __assert_successful_connect_log(conn, caplog)
+ __assert_successful_connect_events(conn, listener)
@pytest.mark.asyncio
async def test_auth_success_async(with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
- client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
+ listener = AccumulatingConnectionListener()
+ client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD,
+ event_listeners=[listener], **ssl_params)
with caplog.at_level(logger='pyignite', level=logging.DEBUG):
async with client.connect("127.0.0.1", 10801):
assert all(node.alive for node in client._nodes)
+ conn = client._nodes[0]
- __assert_successful_connect_log(caplog)
+ __assert_successful_connect_log(conn, caplog)
+ __assert_successful_connect_events(conn, listener)
-def __assert_successful_connect_log(caplog):
- assert any(re.match(r'Connecting to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
- assert any(re.match(r'Connected to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
- assert any(re.match(r'Connection closed to node\(address=127.0.0.1,\s+port=10801', r.message)
+def __assert_successful_connect_log(conn, caplog):
+ assert any(re.match(rf'Connecting to node\(address={conn.host},\s+port={conn.port}', r.message)
+ for r in caplog.records)
+ assert any(re.match(rf'Connected to node\(address={conn.host},\s+port={conn.port}', r.message)
+ for r in caplog.records)
+ assert any(re.match(rf'Connection closed to node\(address={conn.host},\s+port={conn.port}', r.message)
for r in caplog.records)
+def __assert_successful_connect_events(conn, listener):
+ event_classes = (HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent)
+
+ for cls in event_classes:
+ any(isinstance(ev, cls) for ev in listener.events)
+
+ for ev in listener.events:
+ if isinstance(ev, event_classes):
+ assert ev.host == conn.host
+ assert ev.port == conn.port
+ if isinstance(ev, (HandshakeSuccessEvent, ConnectionClosedEvent)):
+ assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+ if isinstance(ev, HandshakeSuccessEvent):
+ assert ev.protocol_context
+
+
auth_failed_params = [
[DEFAULT_IGNITE_USERNAME, None],
['invalid_user', 'invalid_password'],
@@ -83,13 +113,15 @@ auth_failed_params = [
)
def test_auth_failed(username, password, with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
-
+ listener = AccumulatingConnectionListener()
with pytest.raises(AuthenticationError):
- client = Client(username=username, password=password, **ssl_params)
+ client = Client(username=username, password=password,
+ event_listeners=[listener], **ssl_params)
with client.connect("127.0.0.1", 10801):
pass
- __assert_auth_failed_log(caplog)
+ __assert_auth_failed_log(caplog)
+ __assert_auth_failed_listener(listener)
@pytest.mark.parametrize(
@@ -99,15 +131,30 @@ def test_auth_failed(username, password, with_ssl, ssl_params, caplog):
@pytest.mark.asyncio
async def test_auth_failed_async(username, password, with_ssl, ssl_params, caplog):
ssl_params['use_ssl'] = with_ssl
-
+ listener = AccumulatingConnectionListener()
with pytest.raises(AuthenticationError):
- client = AioClient(username=username, password=password, **ssl_params)
+ client = AioClient(username=username, password=password,
+ event_listeners=[listener], **ssl_params)
async with client.connect("127.0.0.1", 10801):
pass
- __assert_auth_failed_log(caplog)
+ __assert_auth_failed_log(caplog)
+ __assert_auth_failed_listener(listener)
def __assert_auth_failed_log(caplog):
pattern = r'Authentication failed while connecting to node\(address=127.0.0.1,\s+port=10801'
- assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)
+ assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR)
+ for r in caplog.records)
+
+
+def __assert_auth_failed_listener(listener):
+ found = False
+ for ev in listener.events:
+ if isinstance(ev, AuthenticationFailedEvent):
+ found = True
+ assert ev.host == '127.0.0.1'
+ assert ev.port == 10801
+ assert ev.protocol_context
+ assert 'AuthenticationError' in ev.error_msg
+ assert found
diff --git a/tests/security/test_ssl.py b/tests/security/test_ssl.py
index 2cbed4b..ed0808b 100644
--- a/tests/security/test_ssl.py
+++ b/tests/security/test_ssl.py
@@ -17,8 +17,9 @@ import re
import pytest
-from pyignite import Client, AioClient
+from pyignite import Client, AioClient, monitoring
from pyignite.exceptions import ReconnectError
+from tests.security.conftest import AccumulatingConnectionListener
from tests.util import start_ignite_gen, get_or_create_cache, get_or_create_cache_async
@@ -76,25 +77,41 @@ invalid_params = [
@pytest.mark.parametrize('invalid_ssl_params', invalid_params)
def test_connection_error_with_incorrect_config(invalid_ssl_params, caplog):
+ listener = AccumulatingConnectionListener()
with pytest.raises(ReconnectError):
- client = Client(**invalid_ssl_params)
+ client = Client(event_listeners=[listener], **invalid_ssl_params)
with client.connect([("127.0.0.1", 10801)]):
pass
- __assert_handshake_failed_log(caplog)
+ __assert_handshake_failed_log(caplog)
+ __assert_handshake_failed_listener(listener)
@pytest.mark.parametrize('invalid_ssl_params', invalid_params)
@pytest.mark.asyncio
async def test_connection_error_with_incorrect_config_async(invalid_ssl_params, caplog):
+ listener = AccumulatingConnectionListener()
with pytest.raises(ReconnectError):
- client = AioClient(**invalid_ssl_params)
+ client = AioClient(event_listeners=[listener], **invalid_ssl_params)
async with client.connect([("127.0.0.1", 10801)]):
pass
- __assert_handshake_failed_log(caplog)
+ __assert_handshake_failed_log(caplog)
+ __assert_handshake_failed_listener(listener)
def __assert_handshake_failed_log(caplog):
pattern = r'Failed to perform handshake, connection to node\(address=127.0.0.1,\s+port=10801.*failed:'
- assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)
+ assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR)
+ for r in caplog.records)
+
+
+def __assert_handshake_failed_listener(listener):
+ found = False
+ for ev in listener.events:
+ if isinstance(ev, monitoring.HandshakeFailedEvent):
+ found = True
+ assert ev.host == '127.0.0.1'
+ assert ev.port == 10801
+ assert ev.error_msg
+ assert found