You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/07/20 05:24:45 UTC

[ignite-python-thin-client] branch master updated: IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.

This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 82f29e2  IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.
82f29e2 is described below

commit 82f29e202ea4526b49721993e6ea356640ef66e6
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Tue Jul 20 08:24:13 2021 +0300

    IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.
---
 docs/modules.rst                                   |   1 +
 ...st => pyignite.connection.protocol_context.rst} |   8 +-
 docs/source/pyignite.connection.rst                |   6 +
 ...nite.connection.rst => pyignite.monitoring.rst} |   7 +-
 docs/source/pyignite.rst                           |   1 +
 pyignite/aio_client.py                             |  17 +-
 pyignite/client.py                                 |  24 +-
 pyignite/connection/aio_connection.py              |   2 +-
 pyignite/connection/connection.py                  |  44 +-
 pyignite/connection/protocol_context.py            |   3 +
 pyignite/monitoring.py                             | 457 +++++++++++++++++++++
 pyignite/queries/query.py                          |  27 +-
 pyignite/stream/aio_cluster.py                     |  53 ---
 tests/affinity/test_affinity.py                    |   6 +-
 tests/affinity/test_affinity_request_routing.py    | 152 ++++---
 tests/common/test_query_listener.py                | 127 ++++++
 tests/custom/test_connection_events.py             | 129 ++++++
 tests/security/conftest.py                         |  24 ++
 tests/security/test_auth.py                        |  77 +++-
 tests/security/test_ssl.py                         |  29 +-
 20 files changed, 1012 insertions(+), 182 deletions(-)

diff --git a/docs/modules.rst b/docs/modules.rst
index 0cce570..bdeec8e 100644
--- a/docs/modules.rst
+++ b/docs/modules.rst
@@ -31,3 +31,4 @@ of `pyignite`, intended for end users.
     datatypes/parsers
     datatypes/cache_props
     Exceptions <source/pyignite.exceptions>
+    Monitoring and handling events <source/pyignite.monitoring>
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.connection.protocol_context.rst
similarity index 87%
copy from docs/source/pyignite.connection.rst
copy to docs/source/pyignite.connection.protocol_context.rst
index 90c59db..a5298ba 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.connection.protocol_context.rst
@@ -13,10 +13,8 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 
-pyignite.connection package
+pyignite.connection.protocol_context package
 ===========================
 
-.. automodule:: pyignite.connection
-    :members:
-    :undoc-members:
-    :show-inheritance:
+.. automodule:: pyignite.connection.protocol_context
+    :members:
\ No newline at end of file
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.connection.rst
index 90c59db..29c2e57 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.connection.rst
@@ -20,3 +20,9 @@ pyignite.connection package
     :members:
     :undoc-members:
     :show-inheritance:
+
+Submodules
+----------
+
+.. toctree::
+    pyignite.connection.protocol_context
\ No newline at end of file
diff --git a/docs/source/pyignite.connection.rst b/docs/source/pyignite.monitoring.rst
similarity index 88%
copy from docs/source/pyignite.connection.rst
copy to docs/source/pyignite.monitoring.rst
index 90c59db..98b137d 100644
--- a/docs/source/pyignite.connection.rst
+++ b/docs/source/pyignite.monitoring.rst
@@ -13,10 +13,9 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 
-pyignite.connection package
+pyignite.monitoring module
 ===========================
 
-.. automodule:: pyignite.connection
+.. automodule:: pyignite.monitoring
     :members:
-    :undoc-members:
-    :show-inheritance:
+    :member-order: bysource
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst
index 2e52500..7a0744c 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.rst
@@ -44,4 +44,5 @@ Submodules
     pyignite.transaction
     pyignite.cursors
     pyignite.exceptions
+    pyignite.monitoring
 
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 0bb2b8c..083c964 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -16,7 +16,7 @@ import asyncio
 import random
 import sys
 from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict, Optional
+from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
 
 from .aio_cluster import AioCluster
 from .api import cache_get_node_partitions_async
@@ -60,7 +60,8 @@ class AioClient(BaseClient):
     Asynchronous Client implementation.
     """
 
-    def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
+    def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
+                 event_listeners: Optional[Sequence] = None, **kwargs):
         """
         Initialize client.
 
@@ -71,9 +72,10 @@ class AioClient(BaseClient):
          https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
         :param partition_aware: (optional) try to calculate the exact data
          placement from the key before to issue the key operation to the
-         server node, `True` by default.
+         server node, `True` by default,
+        :param event_listeners: (optional) event listeners.
         """
-        super().__init__(compact_footer, partition_aware, **kwargs)
+        super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
         self._registry_mux = asyncio.Lock()
         self._affinity_query_mux = asyncio.Lock()
 
@@ -99,9 +101,8 @@ class AioClient(BaseClient):
 
                         # do not try to open more nodes
                         self._current_node = i
-
                 except connection_errors:
-                    conn.failed = True
+                    pass
 
             self._nodes.append(conn)
 
@@ -301,7 +302,7 @@ class AioClient(BaseClient):
         """
         for _ in range(AFFINITY_RETRIES or 1):
             result = await cache_get_node_partitions_async(conn, caches)
-            if result.status == 0 and result.value['partition_mapping']:
+            if result.status == 0:
                 break
             await asyncio.sleep(AFFINITY_DELAY)
 
@@ -341,7 +342,7 @@ class AioClient(BaseClient):
 
                             asyncio.ensure_future(
                                 asyncio.gather(
-                                    *[conn.reconnect() for conn in self._nodes if not conn.alive],
+                                    *[node.reconnect() for node in self._nodes if not node.alive],
                                     return_exceptions=True
                                 )
                             )
diff --git a/pyignite/client.py b/pyignite/client.py
index 6a499a3..e3dd71b 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -44,7 +44,7 @@ from collections import defaultdict, OrderedDict
 import random
 import re
 from itertools import chain
-from typing import Iterable, Type, Union, Any, Dict, Optional
+from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
 
 from .api import cache_get_node_partitions
 from .api.binary import get_binary_type, put_binary_type
@@ -66,6 +66,7 @@ from .utils import (
     get_field_by_id, unsigned
 )
 from .binary import GenericObjectMeta
+from .monitoring import _EventListeners
 
 
 __all__ = ['Client']
@@ -76,7 +77,8 @@ class BaseClient:
     _identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
     _ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
 
-    def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
+    def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
+                 event_listeners: Optional[Sequence] = None, **kwargs):
         self._compact_footer = compact_footer
         self._partition_aware = partition_aware
         self._connection_args = kwargs
@@ -87,6 +89,7 @@ class BaseClient:
         self.affinity_version = (0, 0)
         self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
         self._protocol_context = None
+        self._event_listeners = _EventListeners(event_listeners)
 
     @property
     def protocol_context(self):
@@ -338,7 +341,8 @@ class Client(BaseClient):
     Synchronous Client implementation.
     """
 
-    def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
+    def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
+                 event_listeners: Optional[Sequence] = None, **kwargs):
         """
         Initialize client.
 
@@ -349,9 +353,10 @@ class Client(BaseClient):
          https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
         :param partition_aware: (optional) try to calculate the exact data
          placement from the key before to issue the key operation to the
-         server node, `True` by default.
+         server node, `True` by default,
+        :param event_listeners: (optional) event listeners.
         """
-        super().__init__(compact_footer, partition_aware, **kwargs)
+        super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
 
     def connect(self, *args):
         """
@@ -382,7 +387,6 @@ class Client(BaseClient):
                         self._current_node = i
 
             except connection_errors:
-                conn.failed = True
                 if self.partition_aware:
                     # schedule the reconnection
                     conn.reconnect()
@@ -565,7 +569,7 @@ class Client(BaseClient):
         """
         for _ in range(AFFINITY_RETRIES or 1):
             result = cache_get_node_partitions(conn, caches)
-            if result.status == 0 and result.value['partition_mapping']:
+            if result.status == 0:
                 break
             time.sleep(AFFINITY_DELAY)
 
@@ -608,9 +612,9 @@ class Client(BaseClient):
 
                 self._update_affinity(full_affinity)
 
-                for conn in self._nodes:
-                    if not conn.alive:
-                        conn.reconnect()
+                for node in self._nodes:
+                    if not node.alive:
+                        node.reconnect()
 
             c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
             parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index c5fa24d..89de49d 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -190,10 +190,10 @@ class AioConnection(BaseConnection):
             self._on_handshake_fail(e)
             raise e
         except Exception as e:
+            self._on_handshake_fail(e)
             # restore undefined protocol version
             if detecting_protocol:
                 self.client.protocol_context = None
-            self._on_handshake_fail(e)
             raise e
 
         self._on_handshake_success(result)
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index ae5587a..2b9970a 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -99,7 +99,9 @@ class BaseConnection:
     def _on_handshake_start(self):
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
-                         self.host, self.port, self.client.protocol_context)
+                         self.host, self.port, self.protocol_context)
+        if self._enabled_connection_listener:
+            self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)
 
     def _on_handshake_success(self, result):
         features = BitmaskFeature.from_array(result.get('features', None))
@@ -109,24 +111,45 @@ class BaseConnection:
 
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
-                         self.host, self.port, self.uuid, self.client.protocol_context)
+                         self.host, self.port, self.uuid, self.protocol_context)
+        if self._enabled_connection_listener:
+            self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)
 
     def _on_handshake_fail(self, err):
+        self.failed = True
+
         if isinstance(err, AuthenticationError):
             logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
                          self.host, self.port, err)
+            if self._enabled_connection_listener:
+                self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
         else:
             logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
                          "with protocol context %s failed: %s",
-                         self.host, self.port, self.client.protocol_context, err, exc_info=True)
+                         self.host, self.port, self.protocol_context, err, exc_info=True)
+            if self._enabled_connection_listener:
+                self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)
 
     def _on_connection_lost(self, err=None, expected=False):
-        if expected and logger.isEnabledFor(logging.DEBUG):
-            logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
-                         self.host, self.port, self.uuid)
+        if expected:
+            if logger.isEnabledFor(logging.DEBUG):
+                logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
+                             self.host, self.port, self.uuid)
+            if self._enabled_connection_listener:
+                self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
         else:
             logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
                         self.host, self.port, self.uuid, err)
+            if self._enabled_connection_listener:
+                self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)
+
+    @property
+    def _enabled_connection_listener(self):
+        return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener
+
+    @property
+    def _connection_listener(self):
+        return self.client._event_listeners
 
 
 class Connection(BaseConnection):
@@ -216,10 +239,10 @@ class Connection(BaseConnection):
             self._on_handshake_fail(e)
             raise e
         except Exception as e:
+            self._on_handshake_fail(e)
             # restore undefined protocol version
             if detecting_protocol:
                 self.client.protocol_context = None
-            self._on_handshake_fail(e)
             raise e
 
         self._on_handshake_success(result)
@@ -260,7 +283,7 @@ class Connection(BaseConnection):
         if self.alive:
             return
 
-        self.close()
+        self.close(on_reconnect=True)
 
         # connect and silence the connection errors
         try:
@@ -352,7 +375,7 @@ class Connection(BaseConnection):
 
         return data
 
-    def close(self):
+    def close(self, on_reconnect=False):
         """
         Try to mark socket closed, then unlink it. This is recommended but
         not required, since sockets are automatically closed when
@@ -364,5 +387,6 @@ class Connection(BaseConnection):
                 self._socket.close()
             except connection_errors:
                 pass
-            self._on_connection_lost(expected=True)
+            if not on_reconnect and not self.failed:
+                self._on_connection_lost(expected=True)
             self._socket = None
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index 58f509e..ba6d9e4 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -44,6 +44,9 @@ class ProtocolContext:
         if not self.is_feature_flags_supported():
             self._features = None
 
+    def copy(self):
+        return ProtocolContext(self.version, self.features)
+
     @property
     def version(self):
         return getattr(self, '_version', None)
diff --git a/pyignite/monitoring.py b/pyignite/monitoring.py
new file mode 100644
index 0000000..9bbfd20
--- /dev/null
+++ b/pyignite/monitoring.py
@@ -0,0 +1,457 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Tools to monitor client's events.
+
+For example, a simple query logger might be implemented like this::
+
+    import logging
+
+    from pyignite import monitoring
+
+    class QueryLogger(monitoring.QueryEventListener):
+
+        def on_query_start(self, event):
+            logging.info(f"Query {event.op_name} with query id "
+                         f"{event.query_id} started on server "
+                         f"{event.host}:{event.port}")
+
+        def on_query_fail(self, event):
+            logging.info(f"Query {event.op_name} with query id "
+                         f"{event.query_id} on server "
+                         f"{event.host}:{event.port} "
+                         f"failed in {event.duration}ms "
+                         f"with error {event.error_msg}")
+
+        def on_query_success(self, event):
+            logging.info(f"Query {event.op_name} with query id "
+                         f"{event.query_id} on server " \
+                         f"{event.host}:{event.port} " \
+                         f"succeeded in {event.duration}ms")
+
+:class:`~ConnectionEventListener` is also available.
+
+Event listeners can be registered by passing parameter to :class:`~pyignite.client.Client` or
+:class:`~pyignite.aio_client.AioClient` constructor::
+
+    client = Client(event_listeners=[QueryLogger()])
+    with client.connect('127.0.0.1', 10800):
+        ....
+
+.. note:: Events are delivered **synchronously**. Application threads block
+  waiting for event handlers. Care must be taken to ensure that your event handlers are efficient
+  enough to not adversely affect overall application performance.
+
+.. note:: Debug logging is also available, standard ``logging`` is used. Just set ``DEBUG`` level to
+    *pyignite* logger.
+|
+|
+"""
+from typing import Optional, Sequence
+
+
+class _BaseEvent:
+    def __init__(self, **kwargs):
+        if kwargs:
+            for k, v in kwargs.items():
+                object.__setattr__(self, k, v)
+
+    def __setattr__(self, name, value):
+        raise TypeError(f'{self.__class__.__name__} is immutable')
+
+    def __repr__(self):
+        pass
+
+
+class _ConnectionEvent(_BaseEvent):
+    __slots__ = ('host', 'port')
+    host: str
+    port: int
+
+    def __init__(self, host, port, **kwargs):
+        super().__init__(host=host, port=port, **kwargs)
+
+
+class _HandshakeEvent(_ConnectionEvent):
+    __slots__ = ('protocol_context',)
+    protocol_context: Optional['ProtocolContext']
+
+    def __init__(self, host, port, protocol_context=None, **kwargs):
+        super().__init__(host, port, protocol_context=protocol_context.copy() if protocol_context else None, **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"protocol_context={self.protocol_context})"
+
+
+class HandshakeStartEvent(_HandshakeEvent):
+    """
+    Published when a handshake started.
+
+    :ivar host: Address of the node to connect,
+    :ivar port: Port number of the node to connect,
+    :ivar protocol_context: Client's protocol context.
+    """
+    def __init__(self, host, port, protocol_context=None, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host, port, protocol_context, **kwargs)
+
+
+class HandshakeFailedEvent(_HandshakeEvent):
+    """
+    Published when a handshake failed.
+
+    :ivar host: Address of the node to connect,
+    :ivar port: Port number of the node to connect,
+    :ivar protocol_context: Client's protocol context,
+    :ivar error_msg: Error message.
+    """
+    __slots__ = ('error_msg',)
+    error_msg: str
+
+    def __init__(self, host, port, protocol_context=None, err=None, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host, port, protocol_context, error_msg=repr(err) if err else '', **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"protocol_context={self.protocol_context}, error_msg={self.error_msg})"
+
+
+class AuthenticationFailedEvent(HandshakeFailedEvent):
+    """
+    Published when an authentication is failed.
+
+    :ivar host: Address of the node to connect,
+    :ivar port: Port number of the node to connect,
+    :ivar protocol_context: Client protocol context,
+    :ivar error_msg: Error message.
+    """
+    pass
+
+
+class HandshakeSuccessEvent(_HandshakeEvent):
+    """
+    Published when a handshake succeeded.
+
+    :ivar host: Address of the node to connect,
+    :ivar port: Port number of the node to connect,
+    :ivar protocol_context: Client's protocol context,
+    :ivar node_uuid: Node's uuid, string.
+    """
+    __slots__ = ('node_uuid',)
+    node_uuid: str
+
+    def __init__(self, host, port, protocol_context, node_uuid, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host, port, protocol_context, node_uuid=str(node_uuid) if node_uuid else '', **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"node_uuid={self.node_uuid}, protocol_context={self.protocol_context})"
+
+
+class ConnectionClosedEvent(_ConnectionEvent):
+    """
+    Published when a connection to the node is expectedly closed.
+
+    :ivar host: Address of node to connect,
+    :ivar port: Port number of node to connect,
+    :ivar node_uuid: Node uuid, string.
+    """
+    __slots__ = ('node_uuid',)
+    node_uuid: str
+
+    def __init__(self, host, port, node_uuid, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host, port, node_uuid=str(node_uuid) if node_uuid else '', **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, node_uuid={self.node_uuid})"
+
+
+class ConnectionLostEvent(ConnectionClosedEvent):
+    """
+    Published when a connection to the node is lost.
+
+    :ivar host: Address of the node to connect,
+    :ivar port: Port number of the node to connect,
+    :ivar node_uuid: Node's uuid, string,
+    :ivar error_msg: Error message.
+    """
+    __slots__ = ('error_msg',)
+    node_uuid: str
+    error_msg: str
+
+    def __init__(self, host, port, node_uuid, err, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host, port, node_uuid, error_msg=repr(err) if err else '', **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"node_uuid={self.node_uuid}, error_msg={self.error_msg})"
+
+
+class _EventListener:
+    pass
+
+
+class ConnectionEventListener(_EventListener):
+    """
+    Base class for connection event listeners.
+    """
+    def on_handshake_start(self, event: HandshakeStartEvent):
+        """
+        Handle handshake start event.
+
+        :param event: Instance of :class:`HandshakeStartEvent`.
+        """
+        pass
+
+    def on_handshake_success(self, event: HandshakeSuccessEvent):
+        """
+        Handle handshake success event.
+
+        :param event: Instance of :class:`HandshakeSuccessEvent`.
+        """
+        pass
+
+    def on_handshake_fail(self, event: HandshakeFailedEvent):
+        """
+        Handle handshake failed event.
+
+        :param event: Instance of :class:`HandshakeFailedEvent`.
+        """
+        pass
+
+    def on_authentication_fail(self, event: AuthenticationFailedEvent):
+        """
+        Handle authentication failed event.
+
+        :param event: Instance of :class:`AuthenticationFailedEvent`.
+        """
+        pass
+
+    def on_connection_closed(self, event: ConnectionClosedEvent):
+        """
+        Handle connection closed event.
+
+        :param event: Instance of :class:`ConnectionClosedEvent`.
+        """
+        pass
+
+    def on_connection_lost(self, event: ConnectionLostEvent):
+        """
+        Handle connection lost event.
+
+        :param event: Instance of :class:`ConnectionLostEvent`.
+        """
+        pass
+
+
+class _QueryEvent(_BaseEvent):
+    __slots__ = ('host', 'port', 'node_uuid', 'query_id', 'op_code', 'op_name')
+    host: str
+    port: int
+    node_uuid: str
+    query_id: int
+    op_code: int
+    op_name: str
+
+    def __init__(self, host, port, node_uuid, query_id, op_code, op_name, **kwargs):
+        """
+        This class is not supposed to be constructed by user.
+        """
+        super().__init__(host=host, port=port, node_uuid=str(node_uuid) if node_uuid else '',
+                         query_id=query_id, op_code=op_code, op_name=op_name, **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \
+               f"op_code={self.op_code}, op_name={self.op_name})"
+
+
+class QueryStartEvent(_QueryEvent):
+    """
+    Published when a client's query started.
+
+    :ivar host: Address of the node on which the query is executed,
+    :ivar port: Port number of the node on which the query is executed,
+    :ivar node_uuid: Node's uuid, string,
+    :ivar query_id: Query's id,
+    :ivar op_code: Operation's id,
+    :ivar op_name: Operation's name.
+    """
+    pass
+
+
+class QuerySuccessEvent(_QueryEvent):
+    """
+    Published when a client's query finished successfully.
+
+    :ivar host: Address of the node on which the query is executed,
+    :ivar port: Port number of the node on which the query is executed,
+    :ivar node_uuid: Node's uuid, string,
+    :ivar query_id: Query's id,
+    :ivar op_code: Operation's id,
+    :ivar op_name: Operation's name,
+    :ivar duration: Query's duration in milliseconds.
+    """
+    __slots__ = ('duration', )
+    duration: int
+
+    def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, **kwargs):
+        super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration, **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"node_uuid={self.node_uuid}, query_id={self.query_id}, " \
+               f"op_code={self.op_code}, op_name={self.op_name}, duration={self.duration})"
+
+
+class QueryFailEvent(_QueryEvent):
+    """
+    Published when a client's query failed.
+
+    :ivar host: Address of the node on which the query is executed,
+    :ivar port: Port number of the node on which the query is executed,
+    :ivar node_uuid: Node's uuid, string,
+    :ivar query_id: Query's id,
+    :ivar op_code: Operation's id,
+    :ivar op_name: Operation's name,
+    :ivar duration: Query's duration in milliseconds,
+    :ivar error_msg: Error message.
+    """
+    __slots__ = ('duration', 'err_msg')
+    duration: int
+    err_msg: str
+
+    def __init__(self, host, port, node_uuid, query_id, op_code, op_name, duration, err, **kwargs):
+        super().__init__(host, port, node_uuid, query_id, op_code, op_name, duration=duration,
+                         err_msg=repr(err) if err else '', **kwargs)
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(host={self.host}, port={self.port}, " \
+               f"node_uuid={self.node_uuid}, query_id={self.query_id}, op_code={self.op_code}, " \
+               f"op_name={self.op_name}, duration={self.duration}, err_msg={self.err_msg})"
+
+
+class QueryEventListener(_EventListener):
+    """
+    Base class for query event listeners.
+    """
+    def on_query_start(self, event: QueryStartEvent):
+        """
+        Handle query start event.
+
+        :param event: Instance of :class:`QueryStartEvent`.
+        """
+        pass
+
+    def on_query_success(self, event: QuerySuccessEvent):
+        """
+        Handle query success event.
+
+        :param event: Instance of :class:`QuerySuccessEvent`.
+        """
+        pass
+
+    def on_query_fail(self, event: QueryFailEvent):
+        """
+        Handle query fail event.
+
+        :param event: Instance of :class:`QueryFailEvent`.
+        """
+        pass
+
+
+class _EventListeners:
+    def __init__(self, listeners: Optional[Sequence]):
+        self.__connection_listeners = []
+        self.__query_listeners = []
+        if listeners:
+            for listener in listeners:
+                if isinstance(listener, ConnectionEventListener):
+                    self.__connection_listeners.append(listener)
+                elif isinstance(listener, QueryEventListener):
+                    self.__query_listeners.append(listener)
+
+    @property
+    def enabled_connection_listener(self):
+        return bool(self.__connection_listeners)
+
+    @property
+    def enabled_query_listener(self):
+        return bool(self.__query_listeners)
+
+    def publish_handshake_start(self, host, port, protocol_context):
+        evt = HandshakeStartEvent(host, port, protocol_context)
+        self.__publish_connection_events(lambda listener: listener.on_handshake_start(evt))
+
+    def publish_handshake_success(self, host, port, protocol_context, node_uuid):
+        evt = HandshakeSuccessEvent(host, port, protocol_context, node_uuid)
+        self.__publish_connection_events(lambda listener: listener.on_handshake_success(evt))
+
+    def publish_handshake_fail(self, host, port, protocol_context, err):
+        evt = HandshakeFailedEvent(host, port, protocol_context, err)
+        self.__publish_connection_events(lambda listener: listener.on_handshake_fail(evt))
+
+    def publish_authentication_fail(self, host, port, protocol_context, err):
+        evt = AuthenticationFailedEvent(host, port, protocol_context, err)
+        self.__publish_connection_events(lambda listener: listener.on_authentication_fail(evt))
+
+    def publish_connection_closed(self, host, port, node_uuid):
+        evt = ConnectionClosedEvent(host, port, node_uuid)
+        self.__publish_connection_events(lambda listener: listener.on_connection_closed(evt))
+
+    def publish_connection_lost(self, host, port, node_uuid, err):
+        evt = ConnectionLostEvent(host, port, node_uuid, err)
+        self.__publish_connection_events(lambda listener: listener.on_connection_lost(evt))
+
+    def publish_query_start(self, host, port, node_uuid, query_id, op_code, op_name):
+        evt = QueryStartEvent(host, port, node_uuid, query_id, op_code, op_name)
+        self.__publish_query_events(lambda listener: listener.on_query_start(evt))
+
+    def publish_query_success(self, host, port, node_uuid, query_id, op_code, op_name, duration):
+        evt = QuerySuccessEvent(host, port, node_uuid, query_id, op_code, op_name, duration)
+        self.__publish_query_events(lambda listener: listener.on_query_success(evt))
+
+    def publish_query_fail(self, host, port, node_uuid, query_id, op_code, op_name, duration, err):
+        evt = QueryFailEvent(host, port, node_uuid, query_id, op_code, op_name, duration, err)
+        self.__publish_query_events(lambda listener: listener.on_query_fail(evt))
+
+    def __publish_connection_events(self, callback):
+        try:
+            for listener in self.__connection_listeners:
+                callback(listener)
+        except: # noqa: 13
+            pass
+
+    def __publish_query_events(self, callback):
+        try:
+            for listener in self.__query_listeners:
+                callback(listener)
+        except: # noqa: 13
+            pass
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 89c354e..c141b26 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -227,12 +227,25 @@ class Query:
         # build result
         return APIResult(response)
 
+    @staticmethod
+    def _enabled_query_listener(conn):
+        client = conn.client
+        return client._event_listeners and client._event_listeners.enabled_query_listener
+
+    @staticmethod
+    def _event_listener(conn):
+        return conn.client._event_listeners
+
     def _on_query_started(self, conn):
+        self._start_ts = time.monotonic()
         if logger.isEnabledFor(logging.DEBUG):
-            self._start_ts = time.monotonic()
             logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)",
                          self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid)
 
+        if self._enabled_query_listener(conn):
+            self._event_listener(conn).publish_query_start(conn.host, conn.port, conn.uuid, self.query_id,
+                                                           self.op_code, _get_op_code_name(self.op_code))
+
     def _on_query_finished(self, conn, result=None, err=None):
         if logger.isEnabledFor(logging.DEBUG):
             dur_ms = _sec_to_millis(time.monotonic() - self._start_ts)
@@ -240,12 +253,20 @@ class Query:
                 err = result.message
             if err:
                 logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
-                             "in %.3f ms: %s", self.query_id, _get_op_code_name(self.op_code),
+                             "in %d ms: %s", self.query_id, _get_op_code_name(self.op_code),
                              conn.host, conn.port, conn.uuid, dur_ms, err)
+                if self._enabled_query_listener(conn):
+                    self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id,
+                                                                  self.op_code, _get_op_code_name(self.op_code),
+                                                                  dur_ms, err)
             else:
                 logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) "
-                             "successfully in %.3f ms", self.query_id, _get_op_code_name(self.op_code),
+                             "successfully in %d ms", self.query_id, _get_op_code_name(self.op_code),
                              conn.host, conn.port, conn.uuid, dur_ms)
+                if self._enabled_query_listener(conn):
+                    self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id,
+                                                                     self.op_code, _get_op_code_name(self.op_code),
+                                                                     dur_ms)
 
 
 class ConfigQuery(Query):
diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py
deleted file mode 100644
index 8a2f98e..0000000
--- a/pyignite/stream/aio_cluster.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-This module contains `AioCluster` that lets you get info and change state of the
-whole cluster.
-"""
-from pyignite import AioClient
-from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
-
-
-class AioCluster:
-    """
-    Ignite cluster abstraction. Users should never use this class directly,
-    but construct its instances with
-    :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
-    """
-
-    def __init__(self, client: 'AioClient'):
-        self._client = client
-
-    async def get_state(self):
-        """
-        Gets current cluster state.
-
-        :return: Current cluster state. This is one of ClusterState.INACTIVE,
-         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
-        """
-        return await cluster_get_state_async(await self._client.random_node())
-
-    async def set_state(self, state):
-        """
-        Changes current cluster state to the given.
-
-        Note: Deactivation clears in-memory caches (without persistence)
-         including the system caches.
-
-        :param state: New cluster state. This is one of ClusterState.INACTIVE,
-         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
-        """
-        return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py
index 3097991..c9a6b60 100644
--- a/tests/affinity/test_affinity.py
+++ b/tests/affinity/test_affinity.py
@@ -36,7 +36,6 @@ from tests.util import wait_for_condition, wait_for_condition_async
 
 def test_get_node_partitions(client, caches):
     cache_ids = [cache.cache_id for cache in caches]
-    __wait_for_ready_affinity(client, cache_ids)
     mappings = __get_mappings(client, cache_ids)
     __check_mappings(mappings, cache_ids)
 
@@ -44,7 +43,6 @@ def test_get_node_partitions(client, caches):
 @pytest.mark.asyncio
 async def test_get_node_partitions_async(async_client, async_caches):
     cache_ids = [cache.cache_id for cache in async_caches]
-    await __wait_for_ready_affinity(async_client, cache_ids)
     mappings = await __get_mappings(async_client, cache_ids)
     __check_mappings(mappings, cache_ids)
 
@@ -157,6 +155,7 @@ def __create_caches_fixture(client):
         caches = []
         try:
             caches = generate_caches()
+            __wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
             yield caches
         finally:
             for cache in caches:
@@ -166,6 +165,7 @@ def __create_caches_fixture(client):
         caches = []
         try:
             caches = await generate_caches()
+            await __wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
             yield caches
         finally:
             await asyncio.gather(*[cache.destroy() for cache in caches])
@@ -180,6 +180,7 @@ def cache(client):
         PROP_CACHE_MODE: CacheMode.PARTITIONED,
     })
     try:
+        __wait_for_ready_affinity(client, [cache.cache_id])
         yield cache
     finally:
         cache.destroy()
@@ -192,6 +193,7 @@ async def async_cache(async_client):
         PROP_CACHE_MODE: CacheMode.PARTITIONED,
     })
     try:
+        await __wait_for_ready_affinity(async_client, [cache.cache_id])
         yield cache
     finally:
         await cache.destroy()
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 0d0ec24..b73eff3 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -22,11 +22,10 @@ import pytest
 
 from pyignite import GenericObjectMeta, AioClient, Client
 from pyignite.aio_cache import AioCache
-from pyignite.connection import Connection, AioConnection
-from pyignite.constants import PROTOCOL_BYTE_ORDER
 from pyignite.datatypes import String, LongObject
 from pyignite.datatypes.cache_config import CacheMode
 from pyignite.datatypes.prop_codes import PROP_NAME, PROP_BACKUPS_NUMBER, PROP_CACHE_KEY_CONFIGURATION, PROP_CACHE_MODE
+from pyignite.monitoring import QueryEventListener
 from tests.util import wait_for_condition, wait_for_condition_async, start_ignite, kill_process_tree
 
 try:
@@ -35,41 +34,37 @@ except ImportError:
     from async_generator import asynccontextmanager
 
 requests = deque()
-old_send = Connection.send
-old_send_async = AioConnection._send
 
 
-def patched_send(self, *args, **kwargs):
-    """Patched send function that push to queue idx of server to which request is routed."""
-    buf = args[0]
-    if buf and len(buf) >= 6:
-        op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
-        # Filter only caches operation.
-        if 1000 <= op_code < 1100:
-            requests.append(self.port % 100)
-    return old_send(self, *args, **kwargs)
+class QueryRouteListener(QueryEventListener):
+    def on_query_start(self, event):
+        if 1000 <= event.op_code < 1100:
+            requests.append(event.port % 100)
 
 
-async def patched_send_async(self, *args, **kwargs):
-    """Patched send function that push to queue idx of server to which request is routed."""
-    buf = args[1]
-    if buf and len(buf) >= 6:
-        op_code = int.from_bytes(buf[4:6], byteorder=PROTOCOL_BYTE_ORDER)
-        # Filter only caches operation.
-        if 1000 <= op_code < 1100:
-            requests.append(self.port % 100)
-    return await old_send_async(self, *args, **kwargs)
+client_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)]
 
 
-def setup_function():
-    requests.clear()
-    Connection.send = patched_send
-    AioConnection._send = patched_send_async
+@pytest.fixture
+def client():
+    client = Client(partition_aware=True, event_listeners=[QueryRouteListener()])
+    try:
+        client.connect(client_connection_string)
+        yield client
+    finally:
+        requests.clear()
+        client.close()
 
 
-def teardown_function():
-    Connection.send = old_send
-    AioConnection.send = old_send_async
+@pytest.fixture
+async def async_client(event_loop):
+    client = AioClient(partition_aware=True, event_listeners=[QueryRouteListener()])
+    try:
+        await client.connect(client_connection_string)
+        yield client
+    finally:
+        requests.clear()
+        await client.close()
 
 
 def wait_for_affinity_distribution(cache, key, node_idx, timeout=30):
@@ -112,7 +107,8 @@ async def wait_for_affinity_distribution_async(cache, key, node_idx, timeout=30)
 
 @pytest.mark.parametrize("key,grid_idx", [(1, 1), (2, 2), (3, 3), (4, 1), (5, 1), (6, 2), (11, 1), (13, 1), (19, 1)])
 @pytest.mark.parametrize("backups", [0, 1, 2, 3])
-def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups, client):
+def test_cache_operation_on_primitive_key_routes_request_to_primary_node(request, key, grid_idx, backups,
+                                                                         client):
     cache = client.get_or_create_cache({
         PROP_NAME: request.node.name + str(backups),
         PROP_BACKUPS_NUMBER: backups,
@@ -210,47 +206,24 @@ def test_cache_operation_on_custom_affinity_key_routes_request_to_primary_node(r
     assert requests.pop() == grid_idx
 
 
-client_routed_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1, 5)]
-
-
 @pytest.fixture
-def client_routed():
-    client = Client(partition_aware=True)
-    try:
-        client.connect(client_routed_connection_string)
-        yield client
-    finally:
-        client.close()
-
-
-@pytest.fixture
-def client_routed_cache(client_routed, request):
-    yield client_routed.get_or_create_cache(request.node.name)
+def client_cache(client, request):
+    yield client.get_or_create_cache(request.node.name)
 
 
 @pytest.fixture
-async def async_client_routed(event_loop):
-    client = AioClient(partition_aware=True)
-    try:
-        await client.connect(client_routed_connection_string)
-        yield client
-    finally:
-        await client.close()
-
-
-@pytest.fixture
-async def async_client_routed_cache(async_client_routed, request):
-    cache = await async_client_routed.get_or_create_cache(request.node.name)
+async def async_client_cache(async_client, request):
+    cache = await async_client.get_or_create_cache(request.node.name)
     yield cache
 
 
-def test_cache_operation_routed_to_new_cluster_node(client_routed_cache):
-    __perform_cache_operation_routed_to_new_node(client_routed_cache)
+def test_cache_operation_routed_to_new_cluster_node(client_cache):
+    __perform_cache_operation_routed_to_new_node(client_cache)
 
 
 @pytest.mark.asyncio
-async def test_cache_operation_routed_to_new_cluster_node_async(async_client_routed_cache):
-    await __perform_cache_operation_routed_to_new_node(async_client_routed_cache)
+async def test_cache_operation_routed_to_new_cluster_node_async(async_client_cache):
+    await __perform_cache_operation_routed_to_new_node(async_client_cache)
 
 
 def __perform_cache_operation_routed_to_new_node(cache):
@@ -328,6 +301,55 @@ async def test_replicated_cache_operation_routed_to_random_node_async(async_repl
     await verify_random_node(async_replicated_cache)
 
 
+def test_replicated_cache_operation_not_routed_to_failed_node(replicated_cache):
+    srv = start_ignite(idx=4)
+    try:
+        while True:
+            replicated_cache.put(1, 1)
+
+            if requests.pop() == 4:
+                break
+
+        kill_process_tree(srv.pid)
+
+        num_failures = 0
+        for i in range(100):
+            # Request may fail one time, because query can be requested before affinity update or connection
+            # lost will be detected.
+            try:
+                replicated_cache.put(1, 1)
+            except:  # noqa 13
+                num_failures += 1
+                assert num_failures <= 1, "Expected no more than 1 failure."
+    finally:
+        kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_replicated_cache_operation_not_routed_to_failed_node_async(async_replicated_cache):
+    srv = start_ignite(idx=4)
+    try:
+        while True:
+            await async_replicated_cache.put(1, 1)
+
+            if requests.pop() == 4:
+                break
+
+        kill_process_tree(srv.pid)
+
+        num_failures = 0
+        for i in range(100):
+            # Request may fail one time, because query can be requested before affinity update or connection
+            # lost will be detected.
+            try:
+                await async_replicated_cache.put(1, 1)
+            except:  # noqa 13
+                num_failures += 1
+                assert num_failures <= 1, "Expected no more than 1 failure."
+    finally:
+        kill_process_tree(srv.pid)
+
+
 def verify_random_node(cache):
     key = 1
 
@@ -423,8 +445,8 @@ async def test_new_registered_cache_affinity_async(async_client):
             assert requests.pop() == 3
 
 
-def test_all_registered_cache_updated_on_new_server(client_routed):
-    with create_caches(client_routed) as caches:
+def test_all_registered_cache_updated_on_new_server(client):
+    with create_caches(client) as caches:
         key = 12
         test_cache = random.choice(caches)
         wait_for_affinity_distribution(test_cache, key, 3)
@@ -444,8 +466,8 @@ def test_all_registered_cache_updated_on_new_server(client_routed):
 
 
 @pytest.mark.asyncio
-async def test_all_registered_cache_updated_on_new_server_async(async_client_routed):
-    async with create_caches_async(async_client_routed) as caches:
+async def test_all_registered_cache_updated_on_new_server_async(async_client):
+    async with create_caches_async(async_client) as caches:
         key = 12
         test_cache = random.choice(caches)
         await wait_for_affinity_distribution_async(test_cache, key, 3)
diff --git a/tests/common/test_query_listener.py b/tests/common/test_query_listener.py
new file mode 100644
index 0000000..afff542
--- /dev/null
+++ b/tests/common/test_query_listener.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.exceptions import CacheError
+from pyignite.monitoring import QueryEventListener, QueryStartEvent, QueryFailEvent, QuerySuccessEvent
+from pyignite.queries.op_codes import OP_CACHE_PUT, OP_CACHE_PARTITIONS, OP_CLUSTER_GET_STATE
+
+events = []
+
+
+class QueryRouteListener(QueryEventListener):
+    def on_query_start(self, event):
+        if event.op_code != OP_CACHE_PARTITIONS:
+            events.append(event)
+
+    def on_query_fail(self, event):
+        if event.op_code != OP_CACHE_PARTITIONS:
+            events.append(event)
+
+    def on_query_success(self, event):
+        if event.op_code != OP_CACHE_PARTITIONS:
+            events.append(event)
+
+
+@pytest.fixture
+def client():
+    client = Client(event_listeners=[QueryRouteListener()])
+    try:
+        client.connect('127.0.0.1', 10801)
+        yield client
+    finally:
+        client.close()
+        events.clear()
+
+
+@pytest.fixture
+async def async_client(event_loop):
+    client = AioClient(event_listeners=[QueryRouteListener()])
+    try:
+        await client.connect('127.0.0.1', 10801)
+        yield client
+    finally:
+        await client.close()
+        events.clear()
+
+
+def test_query_fail_events(request, client):
+    with pytest.raises(CacheError):
+        cache = client.get_cache(request.node.name)
+        cache.put(1, 1)
+
+    __assert_fail_events(client)
+
+
+@pytest.mark.asyncio
+async def test_query_fail_events_async(request, async_client):
+    with pytest.raises(CacheError):
+        cache = await async_client.get_cache(request.node.name)
+        await cache.put(1, 1)
+
+    __assert_fail_events(async_client)
+
+
+def __assert_fail_events(client):
+    assert len(events) == 2
+    conn = client._nodes[0]
+    for ev in events:
+        if isinstance(ev, QueryStartEvent):
+            assert ev.op_code == OP_CACHE_PUT
+            assert ev.op_name == 'OP_CACHE_PUT'
+            assert ev.host == conn.host
+            assert ev.port == conn.port
+            assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+
+        if isinstance(ev, QueryFailEvent):
+            assert ev.op_code == OP_CACHE_PUT
+            assert ev.op_name == 'OP_CACHE_PUT'
+            assert ev.host == conn.host
+            assert ev.port == conn.port
+            assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+            assert 'Cache does not exist' in ev.err_msg
+            assert ev.duration > 0
+
+
+def test_query_success_events(client):
+    client.get_cluster().get_state()
+    __assert_success_events(client)
+
+
+@pytest.mark.asyncio
+async def test_query_success_events_async(async_client):
+    await async_client.get_cluster().get_state()
+    __assert_success_events(async_client)
+
+
+def __assert_success_events(client):
+    assert len(events) == 2
+    conn = client._nodes[0]
+    for ev in events:
+        if isinstance(ev, QueryStartEvent):
+            assert ev.op_code == OP_CLUSTER_GET_STATE
+            assert ev.op_name == 'OP_CLUSTER_GET_STATE'
+            assert ev.host == conn.host
+            assert ev.port == conn.port
+            assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+
+        if isinstance(ev, QuerySuccessEvent):
+            assert ev.op_code == OP_CLUSTER_GET_STATE
+            assert ev.op_name == 'OP_CLUSTER_GET_STATE'
+            assert ev.host == conn.host
+            assert ev.port == conn.port
+            assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+            assert ev.duration > 0
diff --git a/tests/custom/test_connection_events.py b/tests/custom/test_connection_events.py
new file mode 100644
index 0000000..bee9395
--- /dev/null
+++ b/tests/custom/test_connection_events.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import random
+
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.monitoring import ConnectionEventListener, ConnectionLostEvent, ConnectionClosedEvent, \
+    HandshakeSuccessEvent, HandshakeFailedEvent, HandshakeStartEvent
+
+from tests.util import start_ignite_gen, kill_process_tree
+
+
+@pytest.fixture(autouse=True)
+def server1():
+    yield from start_ignite_gen(idx=1)
+
+
+@pytest.fixture(autouse=True)
+def server2():
+    yield from start_ignite_gen(idx=2)
+
+
+events = []
+
+
+def teardown_function():
+    events.clear()
+
+
+class RecordingConnectionEventListener(ConnectionEventListener):
+    def on_handshake_start(self, event):
+        events.append(event)
+
+    def on_handshake_success(self, event):
+        events.append(event)
+
+    def on_handshake_fail(self, event):
+        events.append(event)
+
+    def on_authentication_fail(self, event):
+        events.append(event)
+
+    def on_connection_closed(self, event):
+        events.append(event)
+
+    def on_connection_lost(self, event):
+        events.append(event)
+
+
+def test_events(request, server2):
+    client = Client(event_listeners=[RecordingConnectionEventListener()])
+    with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
+        protocol_context = client.protocol_context
+        nodes = {conn.port: conn for conn in client._nodes}
+        cache = client.get_or_create_cache(request.node.name)
+        kill_process_tree(server2.pid)
+
+        while True:
+            try:
+                cache.put(random.randint(0, 1000), 1)
+            except: # noqa 13
+                pass
+
+            if any(isinstance(e, ConnectionLostEvent) for e in events):
+                break
+
+    __assert_events(nodes, protocol_context)
+
+
+@pytest.mark.asyncio
+async def test_events_async(request, server2):
+    client = AioClient(event_listeners=[RecordingConnectionEventListener()])
+    async with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
+        protocol_context = client.protocol_context
+        nodes = {conn.port: conn for conn in client._nodes}
+        cache = await client.get_or_create_cache(request.node.name)
+        kill_process_tree(server2.pid)
+
+        while True:
+            try:
+                await cache.put(random.randint(0, 1000), 1)
+            except: # noqa 13
+                pass
+
+            if any(isinstance(e, ConnectionLostEvent) for e in events):
+                break
+
+    __assert_events(nodes, protocol_context)
+
+
+def __assert_events(nodes, protocol_context):
+    assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1
+    # ConnectionLostEvent is a subclass of ConnectionClosedEvent
+    assert len([e for e in events if type(e) == ConnectionClosedEvent]) == 1
+    assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2
+
+    for ev in events:
+        assert ev.host == '127.0.0.1'
+        if isinstance(ev, ConnectionLostEvent):
+            assert ev.port == 10802
+            assert ev.node_uuid == str(nodes[ev.port].uuid)
+            assert ev.error_msg
+        elif isinstance(ev, HandshakeStartEvent):
+            assert ev.protocol_context == protocol_context
+            assert ev.port in {10801, 10802}
+        elif isinstance(ev, HandshakeFailedEvent):
+            assert ev.port == 10802
+            assert ev.protocol_context == protocol_context
+            assert ev.error_msg
+        elif isinstance(ev, HandshakeSuccessEvent):
+            assert ev.port in {10801, 10802}
+            assert ev.node_uuid == str(nodes[ev.port].uuid)
+            assert ev.protocol_context == protocol_context
+        elif isinstance(ev, ConnectionClosedEvent):
+            assert ev.port == 10801
+            assert ev.node_uuid == str(nodes[ev.port].uuid)
diff --git a/tests/security/conftest.py b/tests/security/conftest.py
index d5de5a1..8845c31 100644
--- a/tests/security/conftest.py
+++ b/tests/security/conftest.py
@@ -16,6 +16,7 @@ import os
 
 import pytest
 
+from pyignite import monitoring
 from tests.util import get_test_dir
 
 
@@ -47,3 +48,26 @@ def __create_ssl_param(with_password=False):
             'ssl_certfile': cert,
             'ssl_ca_certfile': cert
         }
+
+
+class AccumulatingConnectionListener(monitoring.ConnectionEventListener):
+    def __init__(self):
+        self.events = []
+
+    def on_handshake_start(self, event):
+        self.events.append(event)
+
+    def on_handshake_success(self, event):
+        self.events.append(event)
+
+    def on_handshake_fail(self, event):
+        self.events.append(event)
+
+    def on_authentication_fail(self, event):
+        self.events.append(event)
+
+    def on_connection_closed(self, event):
+        self.events.append(event)
+
+    def on_connection_lost(self, event):
+        self.events.append(event)
diff --git a/tests/security/test_auth.py b/tests/security/test_auth.py
index 3586c91..503cf88 100644
--- a/tests/security/test_auth.py
+++ b/tests/security/test_auth.py
@@ -18,7 +18,11 @@ import re
 import pytest
 
 from pyignite import Client, AioClient
+from pyignite.monitoring import (
+    HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent, AuthenticationFailedEvent
+)
 from pyignite.exceptions import AuthenticationError
+from tests.security.conftest import AccumulatingConnectionListener
 from tests.util import start_ignite_gen, clear_ignite_work_dir
 
 DEFAULT_IGNITE_USERNAME = 'ignite'
@@ -44,32 +48,58 @@ def cleanup():
 
 def test_auth_success(with_ssl, ssl_params, caplog):
     ssl_params['use_ssl'] = with_ssl
-    client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
+    listener = AccumulatingConnectionListener()
+    client = Client(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD,
+                    event_listeners=[listener], **ssl_params)
     with caplog.at_level(logger='pyignite', level=logging.DEBUG):
         with client.connect("127.0.0.1", 10801):
             assert all(node.alive for node in client._nodes)
+            conn = client._nodes[0]
 
-        __assert_successful_connect_log(caplog)
+        __assert_successful_connect_log(conn, caplog)
+        __assert_successful_connect_events(conn, listener)
 
 
 @pytest.mark.asyncio
 async def test_auth_success_async(with_ssl, ssl_params, caplog):
     ssl_params['use_ssl'] = with_ssl
-    client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD, **ssl_params)
+    listener = AccumulatingConnectionListener()
+    client = AioClient(username=DEFAULT_IGNITE_USERNAME, password=DEFAULT_IGNITE_PASSWORD,
+                       event_listeners=[listener], **ssl_params)
     with caplog.at_level(logger='pyignite', level=logging.DEBUG):
         async with client.connect("127.0.0.1", 10801):
             assert all(node.alive for node in client._nodes)
+            conn = client._nodes[0]
 
-        __assert_successful_connect_log(caplog)
+        __assert_successful_connect_log(conn, caplog)
+        __assert_successful_connect_events(conn, listener)
 
 
-def __assert_successful_connect_log(caplog):
-    assert any(re.match(r'Connecting to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
-    assert any(re.match(r'Connected to node\(address=127.0.0.1,\s+port=10801', r.message) for r in caplog.records)
-    assert any(re.match(r'Connection closed to node\(address=127.0.0.1,\s+port=10801', r.message)
+def __assert_successful_connect_log(conn, caplog):
+    assert any(re.match(rf'Connecting to node\(address={conn.host},\s+port={conn.port}', r.message)
+               for r in caplog.records)
+    assert any(re.match(rf'Connected to node\(address={conn.host},\s+port={conn.port}', r.message)
+               for r in caplog.records)
+    assert any(re.match(rf'Connection closed to node\(address={conn.host},\s+port={conn.port}', r.message)
                for r in caplog.records)
 
 
+def __assert_successful_connect_events(conn, listener):
+    event_classes = (HandshakeStartEvent, HandshakeSuccessEvent, ConnectionClosedEvent)
+
+    for cls in event_classes:
+        any(isinstance(ev, cls) for ev in listener.events)
+
+    for ev in listener.events:
+        if isinstance(ev, event_classes):
+            assert ev.host == conn.host
+            assert ev.port == conn.port
+            if isinstance(ev, (HandshakeSuccessEvent, ConnectionClosedEvent)):
+                assert ev.node_uuid == str(conn.uuid if conn.uuid else '')
+                if isinstance(ev, HandshakeSuccessEvent):
+                    assert ev.protocol_context
+
+
 auth_failed_params = [
     [DEFAULT_IGNITE_USERNAME, None],
     ['invalid_user', 'invalid_password'],
@@ -83,13 +113,15 @@ auth_failed_params = [
 )
 def test_auth_failed(username, password, with_ssl, ssl_params, caplog):
     ssl_params['use_ssl'] = with_ssl
-
+    listener = AccumulatingConnectionListener()
     with pytest.raises(AuthenticationError):
-        client = Client(username=username, password=password, **ssl_params)
+        client = Client(username=username, password=password,
+                        event_listeners=[listener], **ssl_params)
         with client.connect("127.0.0.1", 10801):
             pass
 
-        __assert_auth_failed_log(caplog)
+    __assert_auth_failed_log(caplog)
+    __assert_auth_failed_listener(listener)
 
 
 @pytest.mark.parametrize(
@@ -99,15 +131,30 @@ def test_auth_failed(username, password, with_ssl, ssl_params, caplog):
 @pytest.mark.asyncio
 async def test_auth_failed_async(username, password, with_ssl, ssl_params, caplog):
     ssl_params['use_ssl'] = with_ssl
-
+    listener = AccumulatingConnectionListener()
     with pytest.raises(AuthenticationError):
-        client = AioClient(username=username, password=password, **ssl_params)
+        client = AioClient(username=username, password=password,
+                           event_listeners=[listener], **ssl_params)
         async with client.connect("127.0.0.1", 10801):
             pass
 
-        __assert_auth_failed_log(caplog)
+    __assert_auth_failed_log(caplog)
+    __assert_auth_failed_listener(listener)
 
 
 def __assert_auth_failed_log(caplog):
     pattern = r'Authentication failed while connecting to node\(address=127.0.0.1,\s+port=10801'
-    assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)
+    assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR)
+               for r in caplog.records)
+
+
+def __assert_auth_failed_listener(listener):
+    found = False
+    for ev in listener.events:
+        if isinstance(ev, AuthenticationFailedEvent):
+            found = True
+            assert ev.host == '127.0.0.1'
+            assert ev.port == 10801
+            assert ev.protocol_context
+            assert 'AuthenticationError' in ev.error_msg
+    assert found
diff --git a/tests/security/test_ssl.py b/tests/security/test_ssl.py
index 2cbed4b..ed0808b 100644
--- a/tests/security/test_ssl.py
+++ b/tests/security/test_ssl.py
@@ -17,8 +17,9 @@ import re
 
 import pytest
 
-from pyignite import Client, AioClient
+from pyignite import Client, AioClient, monitoring
 from pyignite.exceptions import ReconnectError
+from tests.security.conftest import AccumulatingConnectionListener
 from tests.util import start_ignite_gen, get_or_create_cache, get_or_create_cache_async
 
 
@@ -76,25 +77,41 @@ invalid_params = [
 
 @pytest.mark.parametrize('invalid_ssl_params', invalid_params)
 def test_connection_error_with_incorrect_config(invalid_ssl_params, caplog):
+    listener = AccumulatingConnectionListener()
     with pytest.raises(ReconnectError):
-        client = Client(**invalid_ssl_params)
+        client = Client(event_listeners=[listener], **invalid_ssl_params)
         with client.connect([("127.0.0.1", 10801)]):
             pass
 
-        __assert_handshake_failed_log(caplog)
+    __assert_handshake_failed_log(caplog)
+    __assert_handshake_failed_listener(listener)
 
 
 @pytest.mark.parametrize('invalid_ssl_params', invalid_params)
 @pytest.mark.asyncio
 async def test_connection_error_with_incorrect_config_async(invalid_ssl_params, caplog):
+    listener = AccumulatingConnectionListener()
     with pytest.raises(ReconnectError):
-        client = AioClient(**invalid_ssl_params)
+        client = AioClient(event_listeners=[listener], **invalid_ssl_params)
         async with client.connect([("127.0.0.1", 10801)]):
             pass
 
-        __assert_handshake_failed_log(caplog)
+    __assert_handshake_failed_log(caplog)
+    __assert_handshake_failed_listener(listener)
 
 
 def __assert_handshake_failed_log(caplog):
     pattern = r'Failed to perform handshake, connection to node\(address=127.0.0.1,\s+port=10801.*failed:'
-    assert any(re.match(pattern, r.message) and r.levelname == logging.ERROR for r in caplog.records)
+    assert any(re.match(pattern, r.message) and r.levelname == logging.getLevelName(logging.ERROR)
+               for r in caplog.records)
+
+
+def __assert_handshake_failed_listener(listener):
+    found = False
+    for ev in listener.events:
+        if isinstance(ev, monitoring.HandshakeFailedEvent):
+            found = True
+            assert ev.host == '127.0.0.1'
+            assert ev.port == 10801
+            assert ev.error_msg
+    assert found