You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2023/02/09 08:42:25 UTC

[ignite] branch master updated: IGNITE-18615 Implement monitoring events in java thin client (#10501)

This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b537d444be IGNITE-18615 Implement monitoring events in java thin client (#10501)
1b537d444be is described below

commit 1b537d444be5ae5c007c3fd677782a3b5a5274c3
Author: Ivan Daschinskiy <iv...@apache.org>
AuthorDate: Thu Feb 9 11:42:13 2023 +0300

    IGNITE-18615 Implement monitoring events in java thin client (#10501)
---
 .../events/ConnectionClosedEvent.java}             |  37 +--
 .../client/events/ConnectionDescription.java       |  82 ++++++
 .../events/ConnectionEvent.java}                   |  29 +--
 .../events/ConnectionEventListener.java}           |  40 +--
 .../ignite/client/events/HandshakeFailEvent.java   |  64 +++++
 .../events/HandshakeStartEvent.java}               |  27 +-
 .../events/HandshakeSuccessEvent.java}             |  38 +--
 .../apache/ignite/client/events/RequestEvent.java  |  79 ++++++
 .../events/RequestEventListener.java}              |  33 +--
 .../ignite/client/events/RequestFailEvent.java     |  70 ++++++
 .../events/RequestStartEvent.java}                 |  30 +--
 .../ignite/client/events/RequestSuccessEvent.java  |  55 ++++
 .../ignite/configuration/ClientConfiguration.java  |  21 ++
 .../monitoring/EventListenerDemultiplexer.java     | 222 ++++++++++++++++
 .../client/thin/ClientChannelConfiguration.java    |  10 +
 .../internal/client/thin/ClientOperation.java      |   3 +
 .../internal/client/thin/ProtocolContext.java      |  17 ++
 .../internal/client/thin/ProtocolVersion.java      |   2 +-
 .../internal/client/thin/TcpClientChannel.java     | 279 ++++++++++++++-------
 .../internal/client/thin/io/ClientConnection.java  |  15 ++
 .../io/gridnioserver/GridNioClientConnection.java  |  11 +
 .../thin/io/gridnioserver/GridNioClientParser.java |   2 +-
 .../org/apache/ignite/client/ReliabilityTest.java  |   2 +-
 .../client/thin/events/FakeIgniteServer.java       | 261 +++++++++++++++++++
 .../IgniteClientConnectionEventListenerTest.java   | 233 +++++++++++++++++
 .../IgniteClientRequestEventListenerTest.java      | 155 ++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |   6 +-
 27 files changed, 1613 insertions(+), 210 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
similarity index 59%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
index a31302b5f26..7a6cb901c3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
@@ -15,27 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
 
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
+/** */
+public class ConnectionClosedEvent extends ConnectionEvent {
+    /** */
+    private final Throwable throwable;
+
     /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
+     * @param conn Connection description.
+     * @param throwable Throwable that caused the failure if any.
      */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+    public ConnectionClosedEvent(
+        ConnectionDescription conn,
+        Throwable throwable
+    ) {
+        super(conn);
+
+        this.throwable = throwable;
+    }
 
     /**
-     * Closes the connection.
+     * Get a cause of the failure if any.
+     *
+     * @return A cause of the failure if any.
      */
-    @Override void close();
+    @Nullable public Throwable throwable() {
+        return throwable;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java
new file mode 100644
index 00000000000..672ba735f5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class ConnectionDescription {
+    /** Local connection address. */
+    private final InetSocketAddress locAddr;
+
+    /** Remote connection address. */
+    private final InetSocketAddress rmtAddr;
+
+    /** Server node id. */
+    private final UUID srvNodeId;
+
+    /** */
+    private final String protocol;
+
+    /**
+     * @param locAddr Local connection address.
+     * @param rmtAddr Remote connection address.
+     * @param protocol String representation of a connection protocol details.
+     * @param srvNodeId Server node id.
+     */
+    public ConnectionDescription(InetSocketAddress locAddr, InetSocketAddress rmtAddr, String protocol, UUID srvNodeId) {
+        this.locAddr = locAddr;
+        this.rmtAddr = rmtAddr;
+        this.protocol = protocol;
+        this.srvNodeId = srvNodeId;
+    }
+
+    /**
+     * Gets local address of this connection.
+     *
+     * @return Local network address or {@code null} if non-socket communication is used.
+     */
+    @Nullable public InetSocketAddress localAddress() {
+        return locAddr;
+    }
+
+    /**
+     * Gets address of remote peer of this connection.
+     *
+     * @return Address of remote peer or {@code null} if non-socket communication is used.
+     */
+    @Nullable public InetSocketAddress remoteAddress() {
+        return rmtAddr;
+    }
+
+    /**
+     * @return Server node id.
+     */
+    @Nullable public UUID serverNodeId() {
+        return srvNodeId;
+    }
+
+    /**
+     * @return String representation of connection protocol.
+     */
+    @Nullable public String protocol() {
+        return protocol;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
index a31302b5f26..941fcf3a773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
 
-import java.nio.ByteBuffer;
+/** */
+public abstract class ConnectionEvent {
+    /** */
+    private final ConnectionDescription conn;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
     /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
+     * @param conn Connection description.
      */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+    protected ConnectionEvent(ConnectionDescription conn) {
+        this.conn = conn;
+    }
 
     /**
-     * Closes the connection.
+     * @return Connection description.
      */
-    @Override void close();
+    public ConnectionDescription connectionDescription() {
+        return conn;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
index a31302b5f26..52ad661cc85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
@@ -15,27 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
 
-import java.nio.ByteBuffer;
+import java.util.EventListener;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public interface ConnectionEventListener extends EventListener {
+    /**
+     * @param event Handshake start event.
+     */
+    default void onHandshakeStart(HandshakeStartEvent event) {
+        // No-op.
+    }
+
+    /**
+     * @param event Handshake success event.
+     */
+    default void onHandshakeSuccess(HandshakeSuccessEvent event) {
+        // No-op.
+    }
 
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
     /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
+     * @param event Handshake fail event.
      */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+    default void onHandshakeFail(HandshakeFailEvent event) {
+        // No-op.
+    }
 
     /**
-     * Closes the connection.
+     * @param event Connection closed event (with or without exception).
      */
-    @Override void close();
+    default void onConnectionClosed(ConnectionClosedEvent event) {
+        // No-op.
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java
new file mode 100644
index 00000000000..4af696d7be9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class HandshakeFailEvent extends ConnectionEvent {
+    /** */
+    private final long elapsedTimeNanos;
+
+    /** */
+    private final Throwable throwable;
+
+    /**
+     * @param conn Connection description.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     * @param throwable Throwable that caused the failure.
+     */
+    public HandshakeFailEvent(
+        ConnectionDescription conn,
+        long elapsedTimeNanos,
+        Throwable throwable
+    ) {
+        super(conn);
+
+        this.elapsedTimeNanos = elapsedTimeNanos;
+        this.throwable = throwable;
+    }
+
+    /**
+     * Get the elapsed time of the handshake.
+     *
+     * @param timeUnit Desired time unit in which to return the elapsed time.
+     * @return the elapsed time.
+     */
+    public long elapsedTime(TimeUnit timeUnit) {
+        return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+    }
+
+    /**
+     * Get a cause of the failure.
+     *
+     * @return a cause of the failure.
+     */
+    public Throwable throwable() {
+        return throwable;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
index a31302b5f26..6c820d0ca5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
@@ -15,27 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
-    /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
-     */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+package org.apache.ignite.client.events;
 
+/** */
+public class HandshakeStartEvent extends ConnectionEvent {
     /**
-     * Closes the connection.
+     * @param conn Connection description.
      */
-    @Override void close();
+    public HandshakeStartEvent(ConnectionDescription conn) {
+        super(conn);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
index a31302b5f26..570b84a2683 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
@@ -15,27 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
 
-import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public class HandshakeSuccessEvent extends ConnectionEvent {
+    /** */
+    private final long elapsedTimeNanos;
 
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
     /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
+     * @param conn Connection description.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
      */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+    public HandshakeSuccessEvent(
+        ConnectionDescription conn,
+        long elapsedTimeNanos
+    ) {
+        super(conn);
+
+        this.elapsedTimeNanos = elapsedTimeNanos;
+    }
 
     /**
-     * Closes the connection.
+     * Get the elapsed time of the handshake.
+     *
+     * @param timeUnit Desired time unit in which to return the elapsed time.
+     * @return the elapsed time.
      */
-    @Override void close();
+    public long elapsedTime(TimeUnit timeUnit) {
+        return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java
new file mode 100644
index 00000000000..74a4e1785df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+/** */
+public abstract class RequestEvent {
+    /** */
+    private final ConnectionDescription conn;
+
+    /** */
+    private final long requestId;
+
+    /** */
+    private final short opCode;
+
+    /** */
+    private final String opName;
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     */
+    protected RequestEvent(
+        ConnectionDescription conn,
+        long requestId,
+        short opCode,
+        String opName
+    ) {
+        this.conn = conn;
+        this.requestId = requestId;
+        this.opCode = opCode;
+        this.opName = opName;
+    }
+
+    /**
+     * @return Connection description.
+     */
+    public ConnectionDescription connectionDescription() {
+        return conn;
+    }
+
+    /**
+     * @return Request id.
+     */
+    public long requestId() {
+        return requestId;
+    }
+
+    /**
+     * @return Operation code.
+     */
+    public short operationCode() {
+        return opCode;
+    }
+
+    /**
+     * @return Operation name.
+     */
+    public String operationName() {
+        return opName;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
index a31302b5f26..0a9da249a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
@@ -15,27 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
 
-import java.nio.ByteBuffer;
+import java.util.EventListener;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public interface RequestEventListener extends EventListener {
+    /**
+     * @param event Request start event.
+     */
+    default void onRequestStart(RequestStartEvent event) {
+        // No-op.
+    }
 
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
     /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
+     * @param event Request success event.
      */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+    default void onRequestSuccess(RequestSuccessEvent event) {
+        // No-op.
+    }
 
     /**
-     * Closes the connection.
+     * @param event Request failure event.
      */
-    @Override void close();
+    default void onRequestFail(RequestFailEvent event) {
+        // No-op.
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java
new file mode 100644
index 00000000000..a9cab4569a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class RequestFailEvent extends RequestEvent {
+    /** */
+    private final long elapsedTimeNanos;
+
+    /** */
+    private final Throwable throwable;
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     * @param throwable Throwable that caused the failure.
+     */
+    public RequestFailEvent(
+        ConnectionDescription conn,
+        long requestId,
+        short opCode,
+        String opName,
+        long elapsedTimeNanos,
+        Throwable throwable
+    ) {
+        super(conn, requestId, opCode, opName);
+
+        this.elapsedTimeNanos = elapsedTimeNanos;
+        this.throwable = throwable;
+    }
+
+    /**
+     * Get the elapsed time of the request.
+     *
+     * @param timeUnit Desired time unit in which to return the elapsed time.
+     * @return the elapsed time.
+     */
+    public long elapsedTime(TimeUnit timeUnit) {
+        return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+    }
+
+    /**
+     * Get a cause of the failure.
+     *
+     * @return a cause of the failure.
+     */
+    public Throwable throwable() {
+        return throwable;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
index a31302b5f26..da1aa57111b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
@@ -15,27 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin.io;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
-    /**
-     * Sends a message.
-     *
-     * @param msg Message buffer.
-     * @param onDone Callback to be invoked when asynchronous send operation completes.
-     */
-    void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+package org.apache.ignite.client.events;
 
+/** */
+public class RequestStartEvent extends RequestEvent {
     /**
-     * Closes the connection.
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
      */
-    @Override void close();
+    public RequestStartEvent(ConnectionDescription conn, long requestId, short opCode, String opName) {
+        super(conn, requestId, opCode, opName);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java
new file mode 100644
index 00000000000..2c3e19be840
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class RequestSuccessEvent extends RequestEvent {
+    /** */
+    private final long elapsedTimeNanos;
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     */
+    public RequestSuccessEvent(
+        ConnectionDescription conn,
+        long requestId,
+        short opCode,
+        String opName,
+        long elapsedTimeNanos
+    ) {
+        super(conn, requestId, opCode, opName);
+
+        this.elapsedTimeNanos = elapsedTimeNanos;
+    }
+
+    /**
+     * Get the elapsed time of the request.
+     *
+     * @param timeUnit Desired time unit in which to return the elapsed time.
+     * @return The elapsed time.
+     */
+    public long elapsedTime(TimeUnit timeUnit) {
+        return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 84b680d0bea..a2d1cdbfbf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.EventListener;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
@@ -155,6 +156,9 @@ public final class ClientConfiguration implements Serializable {
     /** Logger. */
     private IgniteLogger logger;
 
+    /** */
+    private EventListener[] eventListeners;
+
     /**
      * @return Host addresses.
      */
@@ -840,4 +844,21 @@ public final class ClientConfiguration implements Serializable {
     public IgniteLogger getLogger() {
         return logger;
     }
+
+    /**
+     * @param listeners Clent event listeners.
+     * @return {@code this} for chaining.
+     */
+    public ClientConfiguration setEventListeners(EventListener... listeners) {
+        eventListeners = listeners;
+
+        return this;
+    }
+
+    /**
+     * @return Client event listeners.
+     */
+    public EventListener[] getEventListeners() {
+        return eventListeners;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java
new file mode 100644
index 00000000000..af9f3c0b7e3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.monitoring;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EventListener;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionDescription;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.client.events.RequestEventListener;
+import org.apache.ignite.client.events.RequestFailEvent;
+import org.apache.ignite.client.events.RequestStartEvent;
+import org.apache.ignite.client.events.RequestSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Routes events to listeners, specified in the client configuration.
+ */
+public class EventListenerDemultiplexer {
+    /** Noop listener. */
+    private static final EventListenerDemultiplexer NO_OP = new EventListenerDemultiplexer();
+
+    /** */
+    final List<RequestEventListener> reqEventListeners;
+
+    /** */
+    final List<ConnectionEventListener> connEventListeners;
+
+    /** */
+    final IgniteLogger logger;
+
+    /** */
+    EventListenerDemultiplexer() {
+        reqEventListeners = null;
+        connEventListeners = null;
+        logger = NullLogger.INSTANCE;
+    }
+
+    /** */
+    EventListenerDemultiplexer(
+        List<RequestEventListener> reqEventListeners,
+        List<ConnectionEventListener> connEventListeners,
+        IgniteLogger logger
+    ) {
+        this.logger = logger;
+
+        if (!F.isEmpty(reqEventListeners))
+            this.reqEventListeners = Collections.unmodifiableList(reqEventListeners);
+        else
+            this.reqEventListeners = null;
+
+        if (!F.isEmpty(connEventListeners))
+            this.connEventListeners = Collections.unmodifiableList(connEventListeners);
+        else
+            this.connEventListeners = null;
+
+    }
+
+    /**
+     * Creates an event listener demultiplexer.
+     *
+     * @param cfg Client configuration.
+     */
+    public static EventListenerDemultiplexer create(ClientConfiguration cfg) {
+        if (F.isEmpty(cfg.getEventListeners()))
+            return NO_OP;
+
+        List<RequestEventListener> qryEventListeners = new ArrayList<>();
+        List<ConnectionEventListener> connEventListeners = new ArrayList<>();
+
+        for (EventListener l: cfg.getEventListeners()) {
+            if (l instanceof RequestEventListener)
+                qryEventListeners.add((RequestEventListener)l);
+            else if (l instanceof ConnectionEventListener)
+                connEventListeners.add((ConnectionEventListener)l);
+        }
+
+        if (F.isEmpty(qryEventListeners) && F.isEmpty(connEventListeners))
+            return NO_OP;
+
+        return new EventListenerDemultiplexer(qryEventListeners, connEventListeners, NullLogger.whenNull(cfg.getLogger()));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     */
+    public void onRequestStart(ConnectionDescription conn, long requestId, short opCode, String opName) {
+        if (F.isEmpty(reqEventListeners))
+            return;
+
+        executeForEach(reqEventListeners, l -> l.onRequestStart(new RequestStartEvent(conn, requestId, opCode, opName)));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     */
+    public void onRequestSuccess(
+        ConnectionDescription conn,
+        long requestId,
+        short opCode,
+        String opName,
+        long elapsedTimeNanos
+    ) {
+        if (F.isEmpty(reqEventListeners))
+            return;
+
+        executeForEach(reqEventListeners, l ->
+            l.onRequestSuccess(new RequestSuccessEvent(conn, requestId, opCode, opName, elapsedTimeNanos)));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param requestId Request id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     * @param throwable Throwable that caused the failure.
+     */
+    public void onRequestFail(
+        ConnectionDescription conn,
+        long requestId,
+        short opCode,
+        String opName,
+        long elapsedTimeNanos,
+        Throwable throwable
+    ) {
+        if (F.isEmpty(reqEventListeners))
+            return;
+
+        executeForEach(reqEventListeners, l ->
+            l.onRequestFail(new RequestFailEvent(conn, requestId, opCode, opName, elapsedTimeNanos, throwable)));
+    }
+
+    /**
+     * @param conn Connection description.
+     */
+    public void onHandshakeStart(ConnectionDescription conn) {
+        if (F.isEmpty(connEventListeners))
+            return;
+
+        executeForEach(connEventListeners, l -> l.onHandshakeStart(new HandshakeStartEvent(conn)));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     */
+    public void onHandshakeSuccess(ConnectionDescription conn, long elapsedTimeNanos) {
+        if (F.isEmpty(connEventListeners))
+            return;
+
+        executeForEach(connEventListeners, l -> l.onHandshakeSuccess(new HandshakeSuccessEvent(conn, elapsedTimeNanos)));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     * @param throwable Throwable that caused the failure.
+     */
+    public void onHandshakeFail(ConnectionDescription conn, long elapsedTimeNanos, Throwable throwable) {
+        if (F.isEmpty(connEventListeners))
+            return;
+
+        executeForEach(connEventListeners, l -> l.onHandshakeFail(new HandshakeFailEvent(conn, elapsedTimeNanos, throwable)));
+    }
+
+    /**
+     * @param conn Connection description.
+     * @param throwable Throwable that caused the failure if any.
+     */
+    public void onConnectionClosed(ConnectionDescription conn, Throwable throwable) {
+        if (F.isEmpty(connEventListeners))
+            return;
+
+        executeForEach(connEventListeners, l -> l.onConnectionClosed(new ConnectionClosedEvent(conn, throwable)));
+    }
+
+    /** */
+    private <T> void executeForEach(List<T> listeners, Consumer<T> action) {
+        assert !F.isEmpty(listeners);
+
+        for (T listener: listeners) {
+            try {
+                action.accept(listener);
+            }
+            catch (Exception e) {
+                logger.warning("Exception thrown while consuming event in listener " + listener, e);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
index e81dc9204c8..f22ce8f91e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.client.SslMode;
 import org.apache.ignite.client.SslProtocol;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.monitoring.EventListenerDemultiplexer;
 
 /**
  * Configuration required to initialize {@link TcpClientChannel}.
@@ -110,6 +111,9 @@ final class ClientChannelConfiguration {
     /** */
     private final IgniteLogger logger;
 
+    /** */
+    private final EventListenerDemultiplexer eventListener;
+
     /**
      * Constructor.
      */
@@ -141,6 +145,7 @@ final class ClientChannelConfiguration {
         this.heartbeatInterval = cfg.getHeartbeatInterval();
         this.autoBinaryConfigurationEnabled = cfg.isAutoBinaryConfigurationEnabled();
         this.logger = cfg.getLogger();
+        this.eventListener = EventListenerDemultiplexer.create(cfg);
     }
 
     /**
@@ -325,4 +330,9 @@ final class ClientChannelConfiguration {
     public IgniteLogger getLogger() {
         return logger;
     }
+
+    /** */
+    public EventListenerDemultiplexer eventListener() {
+        return eventListener;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index ddd12c1bd47..c291a0e3e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -24,6 +24,9 @@ import org.jetbrains.annotations.Nullable;
 
 /** Operation codes. */
 public enum ClientOperation {
+    /** Handshake */
+    HANDSHAKE(-1),
+
     /** Resource close. */
     RESOURCE_CLOSE(0),
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
index ec8fb6e2f51..98ee17871a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.thin;
 
 import java.util.EnumSet;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Protocol Context.
@@ -30,6 +31,13 @@ public class ProtocolContext {
     /** Features. */
     private final EnumSet<ProtocolBitmaskFeature> features;
 
+    /**
+     * @param ver Protocol version.
+     */
+    public ProtocolContext(ProtocolVersion ver) {
+        this(ver, null);
+    }
+
     /**
      * @param ver Protocol version.
      * @param features Supported features.
@@ -87,4 +95,13 @@ public class ProtocolContext {
     public static boolean isFeatureSupported(ProtocolVersion ver, ProtocolVersionFeature feature) {
         return ver.compareTo(feature.verIntroduced()) >= 0;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(
+            ProtocolContext.class.getSimpleName(),
+            "version", version(), false,
+            "features", features.toString(), false
+        );
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
index bf2e112113c..4d3a3feb0e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
@@ -56,7 +56,7 @@ public final class ProtocolVersion implements Comparable<ProtocolVersion> {
     private final short patch;
 
     /** Constructor. */
-    ProtocolVersion(short major, short minor, short patch) {
+    public ProtocolVersion(short major, short minor, short patch) {
         this.major = major;
         this.minor = minor;
         this.patch = patch;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index a97298bfe7d..5d2d38f09c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -48,6 +48,7 @@ import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.ClientReconnectedException;
+import org.apache.ignite.client.events.ConnectionDescription;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.monitoring.EventListenerDemultiplexer;
 import org.apache.ignite.internal.client.thin.io.ClientConnection;
 import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
@@ -162,6 +164,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Log. */
     private final IgniteLogger log;
 
+    /** */
+    private final EventListenerDemultiplexer eventListener;
+
+    /** */
+    private final ConnectionDescription connDesc;
+
     /** Last send operation timestamp. */
     private volatile long lastSendMillis;
 
@@ -172,6 +180,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
 
         log = NullLogger.whenNull(cfg.getLogger());
 
+        eventListener = cfg.eventListener();
+
         for (ClientNotificationType type : ClientNotificationType.values()) {
             if (type.keepNotificationsWithoutListener())
                 pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
@@ -189,6 +199,10 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
 
         handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
 
+        assert protocolCtx != null : "Protocol context after handshake is null";
+
+        connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId);
+
         heartbeatTimer = protocolCtx.isFeatureSupported(HEARTBEAT) && cfg.getHeartbeatEnabled()
                 ? initHeartbeat(cfg.getHeartbeatInterval())
                 : null;
@@ -219,6 +233,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      */
     private void close(Exception cause) {
         if (closed.compareAndSet(false, true)) {
+            // Skip notification if handshake is not successful or completed.
+            ConnectionDescription connDesc0 = connDesc;
+            if (connDesc0 != null)
+                eventListener.onConnectionClosed(connDesc0, cause);
+
             if (heartbeatTimer != null)
                 heartbeatTimer.cancel();
 
@@ -279,17 +298,25 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
         throws ClientException {
         long id = reqId.getAndIncrement();
+        long startTimeNanos = System.nanoTime();
 
         PayloadOutputChannel payloadCh = new PayloadOutputChannel(this);
 
         try {
-            if (closed())
-                throw new ClientConnectionException("Channel is closed");
+            if (closed()) {
+                ClientConnectionException err = new ClientConnectionException("Channel is closed");
 
-            ClientRequestFuture fut = new ClientRequestFuture();
+                eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+                throw err;
+            }
+
+            ClientRequestFuture fut = new ClientRequestFuture(id, op, startTimeNanos);
 
             pendingReqs.put(id, fut);
 
+            eventListener.onRequestStart(connDesc, id, op.code(), op.name());
+
             BinaryOutputStream req = payloadCh.out();
 
             req.writeInt(0); // Reserve an integer for the request size.
@@ -311,6 +338,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             // Potential double-close is handled in PayloadOutputChannel.
             payloadCh.close();
 
+            eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, t);
+
             throw t;
         }
     }
@@ -322,18 +351,29 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      */
     private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader)
         throws ClientException {
+        long requestId = pendingReq.requestId;
+        ClientOperation op = pendingReq.operation;
+        long startTimeNanos = pendingReq.startTimeNanos;
+
         try {
             ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
 
-            if (payload == null || payloadReader == null)
-                return null;
+            T res = null;
+            if (payload != null && payloadReader != null)
+                res = payloadReader.apply(new PayloadInputChannel(this, payload));
+
+            eventListener.onRequestSuccess(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos);
 
-            return payloadReader.apply(new PayloadInputChannel(this, payload));
+            return res;
         }
         catch (IgniteCheckedException e) {
             log.warning("Failed to process response: " + e.getMessage(), e);
 
-            throw convertException(e);
+            RuntimeException err = convertException(e);
+
+            eventListener.onRequestFail(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+            throw err;
         }
     }
 
@@ -346,22 +386,30 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      */
     private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader) {
         CompletableFuture<T> fut = new CompletableFuture<>();
+        long requestId = pendingReq.requestId;
+        ClientOperation op = pendingReq.operation;
+        long startTimeNanos = pendingReq.startTimeNanos;
 
         pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() -> {
             try {
                 ByteBuffer payload = payloadFut.get();
 
-                if (payload == null || payloadReader == null)
-                    fut.complete(null);
-                else {
-                    T res = payloadReader.apply(new PayloadInputChannel(this, payload));
-                    fut.complete(res);
-                }
+                T res = null;
+                if (payload != null && payloadReader != null)
+                    res = payloadReader.apply(new PayloadInputChannel(this, payload));
+
+                eventListener.onRequestSuccess(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos);
+
+                fut.complete(res);
             }
             catch (Throwable t) {
                 log.warning("Failed to process response: " + t.getMessage(), t);
 
-                fut.completeExceptionally(convertException(t));
+                RuntimeException err = convertException(t);
+
+                eventListener.onRequestFail(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+                fut.completeExceptionally(err);
             }
         }));
 
@@ -601,17 +649,114 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
-        ClientRequestFuture fut = new ClientRequestFuture();
-        pendingReqs.put(-1L, fut);
+        long requestId = -1L;
+        long startTime = System.nanoTime();
 
-        handshakeReq(ver, user, pwd, userAttrs);
+        eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+            new ProtocolContext(ver).toString(), null));
 
-        try {
-            ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
-            handshakeRes(res, ver, user, pwd, userAttrs);
-        }
-        catch (IgniteCheckedException e) {
-            throw new ClientConnectionException(e.getMessage(), e);
+        while (true) {
+            ClientRequestFuture fut = new ClientRequestFuture(requestId, ClientOperation.HANDSHAKE);
+
+            pendingReqs.put(requestId, fut);
+
+            handshakeReq(ver, user, pwd, userAttrs);
+
+            try {
+                ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get();
+
+                BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
+
+                try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
+                    boolean success = res.readBoolean();
+
+                    if (success) {
+                        byte[] features = EMPTY_BYTES;
+
+                        if (ProtocolContext.isFeatureSupported(ver, BITMAP_FEATURES))
+                            features = reader.readByteArray();
+
+                        protocolCtx = new ProtocolContext(ver, ProtocolBitmaskFeature.enumSet(features));
+
+                        if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
+                            // Reading server UUID
+                            srvNodeId = reader.readUuid();
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Handshake succeeded [protocolVersion=" + protocolCtx.version() + ", srvNodeId=" + srvNodeId + ']');
+
+                        eventListener.onHandshakeSuccess(
+                            new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId),
+                            System.nanoTime() - startTime
+                        );
+
+                        break;
+                    }
+                    else {
+                        ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
+
+                        String err = reader.readString();
+                        int errCode = ClientStatus.FAILED;
+
+                        if (res.remaining() > 0)
+                            errCode = reader.readInt();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Handshake failed [protocolVersion=" + srvVer + ", err=" + err + ", errCode=" + errCode + ']');
+
+                        RuntimeException resultErr = null;
+                        if (errCode == ClientStatus.AUTH_FAILED)
+                            resultErr = new ClientAuthenticationException(err);
+                        else if (ver.equals(srvVer))
+                            resultErr = new ClientProtocolError(err);
+                        else if (!supportedVers.contains(srvVer) ||
+                            (!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user))) {
+                            // Server version is not supported by this client OR server version is less than 1.1.0 supporting
+                            // authentication and authentication is required.
+                            resultErr = new ClientProtocolError(String.format(
+                                "Protocol version mismatch: client %s / server %s. Server details: %s",
+                                ver,
+                                srvVer,
+                                err
+                            ));
+                        }
+
+                        if (resultErr != null) {
+                            ConnectionDescription connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+                                new ProtocolContext(ver).toString(), null);
+
+                            long elapsedNanos = System.nanoTime() - startTime;
+
+                            eventListener.onHandshakeFail(connDesc, elapsedNanos, resultErr);
+
+                            throw resultErr;
+                        }
+                        else {
+                            // Retry with server version.
+                            if (log.isDebugEnabled())
+                                log.debug("Retrying handshake with server version [protocolVersion=" + srvVer + ']');
+
+                            ver = srvVer;
+                        }
+                    }
+                }
+            }
+            catch (IOException | IgniteCheckedException e) {
+                ClientException err;
+                if (e instanceof IOException)
+                    err = handleIOError((IOException)e);
+                else
+                    err = new ClientConnectionException(e.getMessage(), e);
+
+                eventListener.onHandshakeFail(
+                    new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), new ProtocolContext(ver).toString(), null),
+                    System.nanoTime() - startTime,
+                    err
+                );
+
+                throw err;
+            }
         }
     }
 
@@ -665,70 +810,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         return new ProtocolContext(ver, features);
     }
 
-    /** Receive and handle handshake response. */
-    private void handshakeRes(ByteBuffer buf, ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
-        throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
-        BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
-
-        try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
-            boolean success = res.readBoolean();
-
-            if (success) {
-                byte[] features = EMPTY_BYTES;
-
-                if (ProtocolContext.isFeatureSupported(proposedVer, BITMAP_FEATURES))
-                    features = reader.readByteArray();
-
-                protocolCtx = new ProtocolContext(proposedVer, ProtocolBitmaskFeature.enumSet(features));
-
-                if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
-                    // Reading server UUID
-                    srvNodeId = reader.readUuid();
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Handshake succeeded [protocolVersion=" + protocolCtx.version() + ", srvNodeId=" + srvNodeId + ']');
-            }
-            else {
-                ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
-
-                String err = reader.readString();
-                int errCode = ClientStatus.FAILED;
-
-                if (res.remaining() > 0)
-                    errCode = reader.readInt();
-
-                if (log.isDebugEnabled())
-                    log.debug("Handshake failed [protocolVersion=" + srvVer + ", err=" + err + ", errCode=" + errCode + ']');
-
-                if (errCode == ClientStatus.AUTH_FAILED)
-                    throw new ClientAuthenticationException(err);
-                else if (proposedVer.equals(srvVer))
-                    throw new ClientProtocolError(err);
-                else if (!supportedVers.contains(srvVer) ||
-                    (!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user)))
-                    // Server version is not supported by this client OR server version is less than 1.1.0 supporting
-                    // authentication and authentication is required.
-                    throw new ClientProtocolError(String.format(
-                        "Protocol version mismatch: client %s / server %s. Server details: %s",
-                        proposedVer,
-                        srvVer,
-                        err
-                    ));
-                else {
-                    // Retry with server version.
-                    if (log.isDebugEnabled())
-                        log.debug("Retrying handshake with server version [protocolVersion=" + srvVer + ']');
-
-                    handshake(srvVer, user, pwd, userAttrs);
-                }
-            }
-        }
-        catch (IOException e) {
-            throw handleIOError(e);
-        }
-    }
-
     /** Write bytes to the output stream. */
     private void write(byte[] bytes, int len, @Nullable Runnable onDone) throws ClientConnectionException {
         ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
@@ -804,10 +885,28 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         return res;
     }
 
-    /**
-     *
-     */
+    /** */
     private static class ClientRequestFuture extends GridFutureAdapter<ByteBuffer> {
+        /** */
+        final long startTimeNanos;
+
+        /** */
+        final long requestId;
+
+        /** */
+        final ClientOperation operation;
+
+        /** */
+        ClientRequestFuture(long requestId, ClientOperation op) {
+            this(requestId, op, System.nanoTime());
+        }
+
+        /** */
+        ClientRequestFuture(long requestId, ClientOperation op, long startTimeNanos) {
+            this.requestId = requestId;
+            operation = op;
+            this.startTimeNanos = startTimeNanos;
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
index a31302b5f26..8bd01bbc6c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client.thin.io;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.ignite.IgniteCheckedException;
@@ -34,6 +35,20 @@ public interface ClientConnection extends AutoCloseable {
      */
     void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
 
+    /**
+     * Gets local address of this session.
+     *
+     * @return Local network address or {@code null} if non-socket communication is used.
+     */
+    @Nullable InetSocketAddress localAddress();
+
+    /**
+     * Gets address of remote peer on this session.
+     *
+     * @return Address of remote peer or {@code null} if non-socket communication is used.
+     */
+    @Nullable InetSocketAddress remoteAddress();
+
     /**
      * Closes the connection.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
index 5a87444cbcf..fe2e894acde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client.thin.io.gridnioserver;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.ignite.IgniteCheckedException;
@@ -62,6 +63,16 @@ class GridNioClientConnection implements ClientConnection {
         ses.addMeta(SES_META_CONN, this);
     }
 
+    /** {@inheritDoc} */
+    @Override @Nullable public InetSocketAddress localAddress() {
+        return ses.localAddress();
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public InetSocketAddress remoteAddress() {
+        return ses.remoteAddress();
+    }
+
     /** {@inheritDoc} */
     @Override public void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException {
         if (onDone != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
index 439c78a72c3..bb20ca63259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client message parser.
  */
-class GridNioClientParser implements GridNioParser {
+public class GridNioClientParser implements GridNioParser {
     /** */
     private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 56b4b648958..1a077004b8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -349,7 +349,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
 
         String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
 
-        long expectedNullCnt = 19;
+        long expectedNullCnt = 20;
 
         String msg = nullOps.size()
                 + " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java
new file mode 100644
index 00000000000..546506325f8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.EnumSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.client.thin.ProtocolContext;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.internal.client.thin.ProtocolVersionFeature;
+import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientParser;
+import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.testframework.junits.JUnitAssertAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A fake ignite server for testing handshake and connection errors handling on the thin client side.
+ */
+public class FakeIgniteServer extends JUnitAssertAware implements GridNioServerListener<ByteBuffer>, AutoCloseable {
+    /** */
+    static final int HANDSHAKE_PERFORMED = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    public static final byte[] EMPTY_BYTES = new byte[0];
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final EnumSet<ErrorType> errorTypes;
+
+    /** */
+    private final ProtocolVersion protoVer;
+
+    /** */
+    private final UUID nodeId = UUID.randomUUID();
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger) {
+        this(addr, port, logger, null, null);
+    }
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, EnumSet<ErrorType> errorTypes) {
+        this(addr, port, logger, null, errorTypes);
+    }
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, ProtocolVersion protoVer) {
+        this(addr, port, logger, protoVer, null);
+    }
+
+    /** */
+    public FakeIgniteServer(
+        InetAddress addr,
+        int port,
+        IgniteLogger logger,
+        ProtocolVersion protoVer,
+        EnumSet<ErrorType> errorTypes
+    ) {
+        this.protoVer = protoVer != null ? protoVer : ProtocolVersion.V1_7_0;
+
+        this.errorTypes = errorTypes != null ? errorTypes : EnumSet.noneOf(ErrorType.class);
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                .address(addr)
+                .port(port)
+                .listener(this)
+                .logger(logger)
+                .selectorCount(1)
+                .byteOrder(ByteOrder.LITTLE_ENDIAN)
+                .directBuffer(true)
+                .directMode(false)
+                .filters(
+                    new GridNioCodecFilter(new GridNioClientParser(), logger, false)
+                )
+                .build();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to initialize fake server", e);
+        }
+    }
+
+    /** */
+    public void start() throws IgniteCheckedException {
+        srv.start();
+    }
+
+    /** */
+    public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onConnected(GridNioSession ses) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(GridNioSession ses, ByteBuffer msg) {
+        if (ses.meta(HANDSHAKE_PERFORMED) == null) {
+            if (errorTypes.contains(ErrorType.HANDSHAKE_CONNECTION_ERROR)) {
+                ses.close();
+
+                return;
+            }
+
+            BinaryInputStream res = BinaryByteBufferInputStream.create(msg);
+            try (BinaryReaderExImpl reader = new BinaryReaderExImpl(null, res, null, null, true, true)) {
+                byte reqType = reader.readByte();
+
+                assertEquals(ClientListenerRequest.HANDSHAKE, reqType);
+
+                ProtocolVersion clientVer = new ProtocolVersion(reader.readShort(), reader.readShort(), reader.readShort());
+
+                ByteBuffer response = createMessage(writer -> {
+                    if (errorTypes.contains(ErrorType.HANDSHAKE_ERROR) || errorTypes.contains(ErrorType.AUTHENTICATION_ERROR)
+                        || protoVer.compareTo(clientVer) != 0) {
+                        writer.writeBoolean(false);
+
+                        writer.writeShort(protoVer.major());
+                        writer.writeShort(protoVer.minor());
+                        writer.writeShort(protoVer.patch());
+
+                        if (errorTypes.contains(ErrorType.AUTHENTICATION_ERROR)) {
+                            writer.writeString("Authentication failed");
+                            writer.writeInt(ClientStatus.AUTH_FAILED);
+                        }
+                        else {
+                            writer.writeString("Handshake failed");
+                            writer.writeInt(ClientStatus.FAILED);
+                        }
+                    }
+                    else {
+                        writer.writeBoolean(true);
+
+                        if (ProtocolContext.isFeatureSupported(protoVer, ProtocolVersionFeature.BITMAP_FEATURES))
+                            writer.writeByteArray(EMPTY_BYTES);
+
+                        if (ProtocolContext.isFeatureSupported(protoVer, ProtocolVersionFeature.PARTITION_AWARENESS))
+                            writer.writeUuid(nodeId);
+
+                        ses.addMeta(HANDSHAKE_PERFORMED, true);
+                    }
+                });
+
+                ses.send(response);
+            }
+            catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        else {
+            if (errorTypes.contains(ErrorType.CONNECTION_ERROR))
+                ses.close();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionWriteTimeout(GridNioSession ses) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionIdleTimeout(GridNioSession ses) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFailure(FailureType failureType, Throwable failure) {
+
+    }
+
+    /** */
+    private ByteBuffer createMessage(Consumer<BinaryRawWriter> writerAction) {
+        try (BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(32), null, null)) {
+            writer.writeInt(0);
+
+            writerAction.accept(writer);
+
+            writer.out().writeInt(0, writer.out().position() - 4); // actual size
+
+            return ByteBuffer.wrap(writer.out().arrayCopy(), 0, writer.out().position());
+        }
+    }
+
+    /**
+     * @return Fake node id.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Injected error types.
+     */
+    public enum ErrorType {
+        /** Generic handshake error. */
+        HANDSHAKE_ERROR,
+
+        /** Connection error on handshake. */
+        HANDSHAKE_CONNECTION_ERROR,
+
+        /** Authentication error. */
+        AUTHENTICATION_ERROR,
+
+        /** Connection error. */
+        CONNECTION_ERROR
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java
new file mode 100644
index 00000000000..8712b6985d3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.EventListener;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionEvent;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests connection event listeners of a thin client.
+ */
+public class IgniteClientConnectionEventListenerTest extends GridCommonAbstractTest {
+    /** */
+    private static final InetAddress LOCALHOST;
+
+    /** */
+    private static final int SRV_PORT = 10800;
+
+    static {
+        try {
+            LOCALHOST = InetAddress.getByName("127.0.0.1");
+        }
+        catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** */
+    @Test
+    public void testBasic() throws Exception {
+        ProtocolVersion srvVer = ProtocolVersion.V1_6_0;
+        try (FakeIgniteServer srv = new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), srvVer)) {
+            srv.start();
+
+            Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+            ConnectionEventListener lsnr = new ConnectionEventListener() {
+                @Override public void onHandshakeStart(HandshakeStartEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onHandshakeSuccess(HandshakeSuccessEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+            };
+
+            long startNano = System.nanoTime();
+            try (IgniteClient ignored = startClient(lsnr)) {
+                GridTestUtils.waitForCondition(() -> evSet.size() == 2, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+                HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class);
+
+                assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER
+                    + ", features=[]]");
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress());
+                assertEquals(null, hsStartEv.connectionDescription().serverNodeId());
+
+                HandshakeSuccessEvent hsSuccEv = (HandshakeSuccessEvent)evSet.get(HandshakeSuccessEvent.class);
+
+                assertEquals(hsSuccEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsSuccEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().localAddress().getAddress());
+                assertEquals(srv.nodeId(), hsSuccEv.connectionDescription().serverNodeId());
+                assertTrue(System.nanoTime() - startNano >= hsSuccEv.elapsedTime(TimeUnit.NANOSECONDS));
+            }
+
+            GridTestUtils.waitForCondition(() -> evSet.size() == 3, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+            ConnectionClosedEvent closedEv = (ConnectionClosedEvent)evSet.get(ConnectionClosedEvent.class);
+
+            assertEquals(closedEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+            assertEquals(LOCALHOST, closedEv.connectionDescription().remoteAddress().getAddress());
+            assertEquals(SRV_PORT, closedEv.connectionDescription().remoteAddress().getPort());
+            assertEquals(LOCALHOST, closedEv.connectionDescription().localAddress().getAddress());
+            assertEquals(srv.nodeId(), closedEv.connectionDescription().serverNodeId());
+        }
+    }
+
+    /** */
+    @Test
+    public void testUnsupportedProtocolFail() {
+        ProtocolVersion unsupportedProto = new ProtocolVersion((short)255, (short)0, (short)0);
+        assertTrue(unsupportedProto.compareTo(ProtocolVersion.LATEST_VER) > 0);
+
+        long startNano = System.nanoTime();
+        testFail(
+            () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), unsupportedProto),
+            (HandshakeFailEvent event, Throwable hsErr) -> {
+                assertTrue(System.nanoTime() - startNano >= event.elapsedTime(TimeUnit.NANOSECONDS));
+                assertEquals(hsErr, event.throwable());
+            },
+            HandshakeFailEvent.class
+        );
+    }
+
+    /** */
+    @Test
+    public void testHandshakeFail() {
+        Stream.of(FakeIgniteServer.ErrorType.HANDSHAKE_CONNECTION_ERROR, FakeIgniteServer.ErrorType.HANDSHAKE_ERROR,
+            FakeIgniteServer.ErrorType.AUTHENTICATION_ERROR).forEach(errType -> {
+                AtomicLong startNano = new AtomicLong(System.nanoTime());
+                testFail(
+                    () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(errType)),
+                    (HandshakeFailEvent event, Throwable hsErr) -> {
+                        assertTrue(System.nanoTime() - startNano.get() >= event.elapsedTime(TimeUnit.NANOSECONDS));
+                        assertEquals(hsErr, event.throwable());
+                    },
+                    HandshakeFailEvent.class
+                );
+            });
+    }
+
+    /** */
+    @Test
+    public void testConnectionLost() {
+        testFail(
+            () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(FakeIgniteServer.ErrorType.CONNECTION_ERROR)),
+            IgniteClient::cacheNames,
+            (ev, t) -> {},
+            ConnectionClosedEvent.class
+        );
+    }
+
+    /** */
+    private <Event extends ConnectionEvent> void testFail(
+        Supplier<FakeIgniteServer> srvFactory,
+        BiConsumer<Event, Throwable> checkEventAction,
+        Class<Event> eventCls
+    ) {
+        testFail(srvFactory, client -> fail(), checkEventAction, eventCls);
+    }
+
+    /** */
+    private <Event extends ConnectionEvent> void testFail(
+        Supplier<FakeIgniteServer> srvFactory,
+        Consumer<IgniteClient> clientAction,
+        BiConsumer<Event, Throwable> checkEventAction,
+        Class<Event> eventCls
+    ) {
+        try (FakeIgniteServer srv = srvFactory.get()) {
+            srv.start();
+
+            Throwable hsErr = null;
+            Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+            ConnectionEventListener lsnr = new ConnectionEventListener() {
+                @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onHandshakeFail(HandshakeFailEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+            };
+
+            try (IgniteClient cli = startClient(lsnr)) {
+                clientAction.accept(cli);
+            }
+            catch (Throwable e) {
+                hsErr = e;
+            }
+
+            GridTestUtils.waitForCondition(() -> !evSet.isEmpty(), GridTestUtils.DFLT_TEST_TIMEOUT);
+            assertEquals(1, evSet.size());
+
+            Event failEv = (Event)evSet.get(eventCls);
+
+            assertNotNull(failEv);
+            assertEquals(failEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.V1_7_0
+                + ", features=[]]");
+            assertEquals(LOCALHOST, failEv.connectionDescription().remoteAddress().getAddress());
+            assertEquals(SRV_PORT, failEv.connectionDescription().remoteAddress().getPort());
+            assertEquals(LOCALHOST, failEv.connectionDescription().localAddress().getAddress());
+
+            if (failEv.connectionDescription().serverNodeId() != null)
+                assertEquals(srv.nodeId(), failEv.connectionDescription().serverNodeId());
+
+            checkEventAction.accept(failEv, hsErr);
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Failed event test", e);
+        }
+    }
+
+    /** */
+    private IgniteClient startClient(EventListener... listeners) {
+        String addr = LOCALHOST.getHostName() + ":" + SRV_PORT;
+
+        return Ignition.startClient(new ClientConfiguration().setAddresses(addr).setEventListeners(listeners));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
new file mode 100644
index 00000000000..1e704752b64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.ConnectionDescription;
+import org.apache.ignite.client.events.RequestEvent;
+import org.apache.ignite.client.events.RequestEventListener;
+import org.apache.ignite.client.events.RequestFailEvent;
+import org.apache.ignite.client.events.RequestStartEvent;
+import org.apache.ignite.client.events.RequestSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
+import org.apache.ignite.internal.client.thin.ClientOperation;
+import org.junit.Test;
+
+/**
+ * Tests query event listeners of a thin client.
+ */
+public class IgniteClientRequestEventListenerTest extends AbstractThinClientTest {
+    /** */
+    Map<Class<? extends RequestEvent>, RequestEvent> evSet = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected ClientConfiguration getClientConfiguration() {
+        return super.getClientConfiguration()
+            .setEventListeners(new RequestEventListener() {
+                @Override public void onRequestStart(RequestStartEvent event) {
+                    if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+                        evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onRequestSuccess(RequestSuccessEvent event) {
+                    if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+                        evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onRequestFail(RequestFailEvent event) {
+                    if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+                        evSet.put(event.getClass(), event);
+                }
+            });
+    }
+
+    /** */
+    @Test
+    public void testQuerySuccessEvents() {
+        long startTime = System.nanoTime();
+        try (IgniteClient cli = startClient(0)) {
+            cli.cacheNames();
+
+            assertEquals(2, evSet.size());
+
+            RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
+
+            assertTrue(startEvent.requestId() >= 0);
+
+            ConnectionDescription connDesc = startEvent.connectionDescription();
+            assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+            assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+            assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+            assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+            assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
+            assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
+
+            RequestSuccessEvent successEvent = (RequestSuccessEvent)evSet.get(RequestSuccessEvent.class);
+            assertEquals(successEvent.requestId(), successEvent.requestId());
+
+            connDesc = startEvent.connectionDescription();
+            assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+            assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+            assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+            assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+            assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
+            assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
+
+            assertTrue(System.nanoTime() - startTime >= successEvent.elapsedTime(TimeUnit.NANOSECONDS));
+
+        }
+    }
+
+    /** */
+    @Test
+    public void testQueryFailEvents() {
+        long startTime = System.nanoTime();
+        try (IgniteClient cli = startClient(0)) {
+            cli.cache("non-existent").put(1, 1);
+
+            fail();
+        }
+        catch (ClientException err) {
+            assertEquals(2, evSet.size());
+
+            RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
+
+            assertTrue(startEvent.requestId() >= 0);
+
+            ConnectionDescription connDesc = startEvent.connectionDescription();
+            assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+            assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+            assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+            assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+            assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
+            assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
+
+            RequestFailEvent failEvent = (RequestFailEvent)evSet.get(RequestFailEvent.class);
+            assertEquals(failEvent.requestId(), failEvent.requestId());
+
+            connDesc = startEvent.connectionDescription();
+            assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+            assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+            assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+            assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+            assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
+            assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
+
+            assertEquals(err, failEvent.throwable());
+
+            assertTrue(System.nanoTime() - startTime >= failEvent.elapsedTime(TimeUnit.NANOSECONDS));
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 83b6f0fa86d..a6f6ae7ee76 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResour
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
 import org.apache.ignite.internal.client.thin.TimeoutTest;
+import org.apache.ignite.internal.client.thin.events.IgniteClientConnectionEventListenerTest;
+import org.apache.ignite.internal.client.thin.events.IgniteClientRequestEventListenerTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -80,7 +82,9 @@ import org.junit.runners.Suite;
     BinaryConfigurationTest.class,
     IgniteSetTest.class,
     DataReplicationOperationsTest.class,
-    MetadataRegistrationTest.class
+    MetadataRegistrationTest.class,
+    IgniteClientConnectionEventListenerTest.class,
+    IgniteClientRequestEventListenerTest.class
 })
 public class ClientTestSuite {
     // No-op.