You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/04/06 19:44:17 UTC

[ignite] branch master updated: IGNITE-14492 Java thin client: Refactor notification listener - Fixes #8978.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 43123aa  IGNITE-14492 Java thin client: Refactor notification listener - Fixes #8978.
43123aa is described below

commit 43123aa8e66b1384382131ee7476e5b93f29b1d0
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Apr 6 22:36:09 2021 +0300

    IGNITE-14492 Java thin client: Refactor notification listener - Fixes #8978.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../ignite/internal/client/thin/ClientChannel.java |   9 +-
 .../internal/client/thin/ClientComputeImpl.java    | 249 +++++++--------------
 ...onListener.java => ClientNotificationType.java} |  18 +-
 .../internal/client/thin/ClientOperation.java      |  19 +-
 .../internal/client/thin/NotificationListener.java |  12 +-
 .../internal/client/thin/ReliableChannel.java      |  50 +----
 .../internal/client/thin/TcpClientChannel.java     | 109 ++++++++-
 .../internal/client/thin/ComputeTaskTest.java      |  12 +-
 .../internal/client/thin/ReliableChannelTest.java  |   8 +-
 9 files changed, 220 insertions(+), 266 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index 4f1b91e..90a1e4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -88,8 +88,15 @@ interface ClientChannel extends AutoCloseable {
 
     /**
      * Add notifications (from server to client) listener.
+     *
+     * @throws ClientConnectionException if channel is already closed.
+     */
+    public void addNotificationListener(ClientNotificationType type, Long rsrcId, NotificationListener lsnr);
+
+    /**
+     * Remove notifications (from server to client) listener.
      */
-    public void addNotificationListener(NotificationListener lsnr);
+    public void removeNotificationListener(ClientNotificationType type, Long rsrcId);
 
     /**
      * @return {@code True} channel is closed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index 305a138..7c220c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -19,19 +19,14 @@ package org.apache.ignite.internal.client.thin;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Function;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.client.ClientClusterGroup;
 import org.apache.ignite.client.ClientCompute;
@@ -39,25 +34,22 @@ import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.IgniteClientFuture;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.client.thin.ClientNotificationType.COMPUTE_TASK_FINISHED;
 import static org.apache.ignite.internal.client.thin.ClientOperation.COMPUTE_TASK_EXECUTE;
 import static org.apache.ignite.internal.client.thin.ClientOperation.RESOURCE_CLOSE;
 
 /**
  * Implementation of {@link ClientCompute}.
  */
-class ClientComputeImpl implements ClientCompute, NotificationListener {
+class ClientComputeImpl implements ClientCompute {
     /** No failover flag mask. */
     private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
 
@@ -73,11 +65,8 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
     /** Default cluster group. */
     private final ClientClusterGroupImpl dfltGrp;
 
-    /** Active tasks. */
-    private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>> activeTasks = new ConcurrentHashMap<>();
-
-    /** Guard lock for active tasks. */
-    private final ReadWriteLock guard = new ReentrantReadWriteLock();
+    /** Active tasks count. */
+    private final AtomicInteger tasksCnt = new AtomicInteger();
 
     /** Constructor. */
     ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl dfltGrp) {
@@ -85,24 +74,6 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
         this.dfltGrp = dfltGrp;
 
         utils = new ClientUtils(marsh);
-
-        ch.addNotificationListener(this);
-
-        ch.addChannelCloseListener(clientCh -> {
-            guard.writeLock().lock();
-
-            try {
-                Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.remove(clientCh);
-
-                if (!F.isEmpty(chTasks)) {
-                    for (ClientComputeTask<?> task : chTasks.values())
-                        task.fut.onDone(new ClientConnectionException("Channel to server is closed"));
-                }
-            }
-            finally {
-                guard.writeLock().unlock();
-            }
-        });
     }
 
     /** {@inheritDoc} */
@@ -178,9 +149,9 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
      * @return Resulting client exception.
      */
     private ClientException convertException(Throwable t) {
-        if (t instanceof ClientException) {
+        if (t instanceof ClientException)
             return (ClientException) t;
-        } else if (t.getCause() instanceof ClientException)
+        else if (t.getCause() instanceof ClientException)
             return (ClientException)t.getCause();
         else
             return new ClientException(t);
@@ -211,90 +182,71 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
         Consumer<PayloadOutputChannel> payloadWriter =
                 ch -> writeExecuteTaskRequest(ch, taskName, arg, nodeIds, flags, timeout);
 
-        Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader =
-                ch -> new T2<>(ch.clientChannel(), ch.in().readLong());
+        Function<PayloadInputChannel, ClientComputeTask<R>> payloadReader = ch -> {
+            Long taskId = ch.in().readLong();
 
-        IgniteClientFuture<T2<ClientChannel, Long>> initFut = ch.serviceAsync(
+            ClientComputeTask<R> task = new ClientComputeTask<>(utils, ch.clientChannel(), taskId);
+
+            ch.clientChannel().addNotificationListener(COMPUTE_TASK_FINISHED, taskId, task);
+
+            return task;
+        };
+
+        IgniteClientFuture<ClientComputeTask<R>> initFut = ch.serviceAsync(
                 COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader);
 
         CompletableFuture<R> resFut = new CompletableFuture<>();
         AtomicReference<Object> cancellationToken = new AtomicReference<>();
 
-        initFut.handle((taskParams, err) ->
-                handleExecuteInitFuture(payloadWriter, payloadReader, resFut, cancellationToken, taskParams, err));
+        initFut.handle((task, err) -> handleExecuteInitFuture(resFut, cancellationToken, task, err));
 
         return new IgniteClientFutureImpl<>(resFut, mayInterruptIfRunning -> {
             // 1. initFut has not completed - store cancellation flag.
             // 2. initFut has completed - cancel compute future.
             if (!cancellationToken.compareAndSet(null, mayInterruptIfRunning)) {
-                try {
-                    GridFutureAdapter<?> fut = (GridFutureAdapter<?>) cancellationToken.get();
+                GridFutureAdapter<?> fut = (GridFutureAdapter<?>) cancellationToken.get();
 
-                    if (cancelGridFuture(fut, mayInterruptIfRunning)) {
-                        resFut.cancel(mayInterruptIfRunning);
-                        return true;
-                    } else {
-                        return false;
-                    }
-                } catch (IgniteCheckedException e) {
-                    throw IgniteUtils.convertException(e);
-                }
+                if (!cancelGridFuture(fut, mayInterruptIfRunning))
+                    return false;
             }
 
             resFut.cancel(mayInterruptIfRunning);
+
             return true;
         });
     }
 
     /**
      * Handles execute initialization.
-     * @param payloadWriter Writer.
-     * @param payloadReader Reader.
+     *
      * @param resFut Resulting future.
      * @param cancellationToken Cancellation token holder.
-     * @param taskParams Task parameters.
+     * @param task Task.
      * @param err Error
      * @param <R> Result type.
      * @return Null.
      */
     private <R> Object handleExecuteInitFuture(
-            Consumer<PayloadOutputChannel> payloadWriter,
-            Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader,
             CompletableFuture<R> resFut,
             AtomicReference<Object> cancellationToken,
-            T2<ClientChannel, Long> taskParams,
+            ClientComputeTask<R> task,
             Throwable err) {
-        if (err != null) {
+        if (err != null)
             resFut.completeExceptionally(new ClientException(err));
-        } else {
-            ClientComputeTask<Object> task = addTask(taskParams.get1(), taskParams.get2());
-
-            if (task == null) {
-                // Channel closed - try again recursively.
-                ch.serviceAsync(COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader)
-                        .handle((r, e) ->
-                        handleExecuteInitFuture(payloadWriter, payloadReader, resFut, cancellationToken, r, e));
-            }
+        else {
+            if (!cancellationToken.compareAndSet(null, task.fut))
+                cancelGridFuture(task.fut, (Boolean) cancellationToken.get());
 
-            if (!cancellationToken.compareAndSet(null, task.fut)) {
-                try {
-                    cancelGridFuture(task.fut, (Boolean) cancellationToken.get());
-                } catch (IgniteCheckedException e) {
-                    throw U.convertException(e);
-                }
-            }
+            tasksCnt.incrementAndGet();
 
             task.fut.listen(f -> {
-                // Don't remove task if future was canceled by user. This task can be added again later by notification.
-                // To prevent leakage tasks for cancelled futures will be removed on notification (or channel close event).
-                if (!f.isCancelled()) {
-                    removeTask(task.ch, task.taskId);
+                tasksCnt.decrementAndGet();
 
-                    try {
-                        resFut.complete(((GridFutureAdapter<R>) f).get());
-                    } catch (IgniteCheckedException e) {
-                        resFut.completeExceptionally(e.getCause());
-                    }
+                if (!f.isCancelled()) {
+                    if (f.error() == null)
+                        resFut.complete(f.result());
+                    else
+                        resFut.completeExceptionally(f.error());
                 }
             });
         }
@@ -309,9 +261,13 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
      * @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
      *                             otherwise, in-progress tasks are allowed to complete.
      */
-    private static boolean cancelGridFuture(GridFutureAdapter<?> fut, Boolean mayInterruptIfRunning)
-            throws IgniteCheckedException {
-        return mayInterruptIfRunning ? fut.cancel() : fut.onCancelled();
+    private static boolean cancelGridFuture(GridFutureAdapter<?> fut, Boolean mayInterruptIfRunning) {
+        try {
+            return mayInterruptIfRunning ? fut.cancel() : fut.onCancelled();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 
     /**
@@ -349,93 +305,12 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void acceptNotification(
-        ClientChannel ch,
-        ClientOperation op,
-        long rsrcId,
-        ByteBuffer payload,
-        Exception err
-    ) {
-        if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
-            ClientComputeTask<Object> task = addTask(ch, rsrcId);
-
-            if (task != null) { // If channel is closed concurrently, task is already done with "channel closed" reason.
-                if (err == null) {
-                    try {
-                        Object res = payload == null ? null :
-                            utils.readObject(BinaryByteBufferInputStream.create(payload), false);
-
-                        task.fut.onDone(res);
-                    }
-                    catch (Throwable e) {
-                        task.fut.onDone(e);
-                    }
-                }
-                else
-                    task.fut.onDone(err);
-
-                if (task.fut.isCancelled())
-                    removeTask(ch, rsrcId);
-            }
-        }
-    }
-
     /**
-     * @param ch Client channel.
-     * @param taskId Task id.
-     * @return Already registered task, new task if task wasn't registered before, or {@code null} if channel was
-     * closed concurrently.
-     */
-    private ClientComputeTask<Object> addTask(ClientChannel ch, long taskId) {
-        guard.readLock().lock();
-
-        try {
-            // If channel is closed we should only get task if it was registered before, but not add new one.
-            boolean closed = ch.closed();
-
-            Map<Long, ClientComputeTask<Object>> chTasks = closed ? activeTasks.get(ch) :
-                activeTasks.computeIfAbsent(ch, c -> new ConcurrentHashMap<>());
-
-            if (chTasks == null)
-                return null;
-
-            return closed ? chTasks.get(taskId) :
-                chTasks.computeIfAbsent(taskId, t -> new ClientComputeTask<>(ch, taskId));
-        }
-        finally {
-            guard.readLock().unlock();
-        }
-    }
-
-    /**
-     * @param ch Client channel.
-     * @param taskId Task id.
-     */
-    private ClientComputeTask<Object> removeTask(ClientChannel ch, long taskId) {
-        Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.get(ch);
-
-        if (!F.isEmpty(chTasks))
-            return chTasks.remove(taskId);
-
-        return null;
-    }
-
-    /**
-     * Gets tasks future for active tasks started by client.
+     * Gets count of active tasks started by client.
      * Used only for tests.
-     *
-     * @return Map of active tasks keyed by their unique per client task ID.
      */
-    Map<IgniteUuid, IgniteInternalFuture<?>> activeTaskFutures() {
-        Map<IgniteUuid, IgniteInternalFuture<?>> res = new HashMap<>();
-
-        for (Map.Entry<ClientChannel, Map<Long, ClientComputeTask<Object>>> chTasks : activeTasks.entrySet()) {
-            for (Map.Entry<Long, ClientComputeTask<Object>> task : chTasks.getValue().entrySet())
-                res.put(new IgniteUuid(chTasks.getKey().serverNodeId(), task.getKey()), task.getValue().fut);
-        }
-
-        return res;
+    int activeTasksCount() {
+        return tasksCnt.get();
     }
 
     /**
@@ -508,7 +383,7 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
      *
      * @param <R> Result type.
      */
-    private static class ClientComputeTask<R> {
+    private static class ClientComputeTask<R> implements NotificationListener {
         /** Client channel. */
         private final ClientChannel ch;
 
@@ -518,11 +393,15 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
         /** Future. */
         private final GridFutureAdapter<R> fut;
 
+        /** */
+        private final ClientUtils utils;
+
         /**
          * @param ch Client channel.
          * @param taskId Task id.
          */
-        private ClientComputeTask(ClientChannel ch, long taskId) {
+        private ClientComputeTask(ClientUtils utils, ClientChannel ch, Long taskId) {
+            this.utils = utils;
             this.ch = ch;
             this.taskId = taskId;
 
@@ -547,5 +426,29 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
                 }
             };
         }
+
+        /** {@inheritDoc} */
+        @Override public void acceptNotification(ByteBuffer payload, Exception err) {
+            if (err == null) {
+                try {
+                    R res = payload == null ? null :
+                        utils.readObject(BinaryByteBufferInputStream.create(payload), false);
+
+                    fut.onDone(res);
+                }
+                catch (Throwable e) {
+                    fut.onDone(e);
+                }
+            }
+            else
+                fut.onDone(err);
+
+            ch.removeNotificationListener(COMPUTE_TASK_FINISHED, taskId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onChannelClosed(Exception reason) {
+            fut.onDone(new ClientConnectionException("Connection to server is closed", reason));
+        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
index 3aee483..f792908 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
@@ -17,20 +17,10 @@
 
 package org.apache.ignite.internal.client.thin;
 
-import java.nio.ByteBuffer;
-
 /**
- * Server to client notification listener.
+ * Notification types.
  */
-interface NotificationListener {
-    /**
-     * Accept notification.
-     *
-     * @param ch Client channel which was notified.
-     * @param op Client operation.
-     * @param rsrcId Resource id.
-     * @param payload Notification payload or {@code null} if there is no payload.
-     * @param err Error.
-     */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err);
+enum ClientNotificationType {
+    /** Compute task finished. */
+    COMPUTE_TASK_FINISHED;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 2ec8f45..987345d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -78,25 +78,26 @@ enum ClientOperation {
     /** Get nodes info by IDs. */CLUSTER_GROUP_GET_NODE_INFO(5101),
 
     /** Execute compute task. */COMPUTE_TASK_EXECUTE(6000),
-    /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true),
+    /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001,
+        ClientNotificationType.COMPUTE_TASK_FINISHED),
 
     /** Invoke service. */SERVICE_INVOKE(7000);
 
     /** Code. */
     private final int code;
 
-    /** Is notification. */
-    private final boolean notification;
+    /** Type of notification. */
+    private final ClientNotificationType notificationType;
 
     /** Constructor. */
     ClientOperation(int code) {
-        this(code, false);
+        this(code, null);
     }
 
     /** Constructor. */
-    ClientOperation(int code, boolean notification) {
+    ClientOperation(int code, ClientNotificationType notificationType) {
         this.code = code;
-        this.notification = notification;
+        this.notificationType = notificationType;
     }
 
     /**
@@ -107,10 +108,10 @@ enum ClientOperation {
     }
 
     /**
-     * @return {@code True} if operation is notification.
+     * @return Type of notification.
      */
-    public boolean isNotification() {
-        return notification;
+    public ClientNotificationType notificationType() {
+        return notificationType;
     }
 
     /** Enum mapping from code to values. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index 3aee483..e7e233f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -26,11 +26,15 @@ interface NotificationListener {
     /**
      * Accept notification.
      *
-     * @param ch Client channel which was notified.
-     * @param op Client operation.
-     * @param rsrcId Resource id.
      * @param payload Notification payload or {@code null} if there is no payload.
      * @param err Error.
      */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err);
+    public void acceptNotification(ByteBuffer payload, Exception err);
+
+    /**
+     * Handles connection loss.
+     *
+     * @param reason Exception that caused the disconnect, can be {@code null}.
+     */
+    public void onChannelClosed(Exception reason);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 195088d..3637543 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,7 +30,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -59,7 +57,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  * Communication channel with failover and partition awareness.
  */
-final class ReliableChannel implements AutoCloseable, NotificationListener {
+final class ReliableChannel implements AutoCloseable {
     /** Do nothing helper function. */
     private static final Consumer<Integer> DO_NOTHING = (v) -> {};
 
@@ -84,12 +82,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
     /** Node channels. */
     private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
 
-    /** Notification listeners. */
-    private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
-
-    /** Listeners of channel close events. */
-    private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>();
-
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
@@ -388,42 +380,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
     }
 
     /**
-     * Add notification listener.
-     *
-     * @param lsnr Listener.
-     */
-    public void addNotificationListener(NotificationListener lsnr) {
-        notificationLsnrs.add(lsnr);
-    }
-
-    /**
-     * Add listener of channel close event.
-     *
-     * @param lsnr Listener.
-     */
-    public void addChannelCloseListener(Consumer<ClientChannel> lsnr) {
-        channelCloseLsnrs.add(lsnr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void acceptNotification(
-        ClientChannel ch,
-        ClientOperation op,
-        long rsrcId,
-        ByteBuffer payload,
-        Exception err
-    ) {
-        for (NotificationListener lsnr : notificationLsnrs) {
-            try {
-                lsnr.acceptNotification(ch, op, rsrcId, payload, err);
-            }
-            catch (Exception ignore) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
      * Checks if affinity information for the cache is up to date and tries to update it if not.
      *
      * @return {@code True} if affinity information is up to date, {@code false} if there is not affinity information
@@ -922,7 +878,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
 
                     if (channel.serverNodeId() != null) {
                         channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
-                        channel.addNotificationListener(ReliableChannel.this);
 
                         UUID prevId = serverNodeId;
 
@@ -952,9 +907,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
             if (ch != null) {
                 U.closeQuiet(ch);
 
-                for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
-                    lsnr.accept(ch);
-
                 ch = null;
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 11a5016..3cd8688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -24,14 +24,18 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
@@ -62,6 +66,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientFlag;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -123,7 +128,16 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();
 
     /** Notification listeners. */
-    private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+    @SuppressWarnings("unchecked")
+    private final Map<Long, NotificationListener>[] notificationLsnrs = new Map[ClientNotificationType.values().length];
+
+    /** Pending notification. */
+    @SuppressWarnings("unchecked")
+    private final Map<Long, Queue<T2<ByteBuffer, Exception>>>[] pendingNotifications =
+        new Map[ClientNotificationType.values().length];
+
+    /** Guard for notification listeners. */
+    private final ReadWriteLock notificationLsnrsGuard = new ReentrantReadWriteLock();
 
     /** Closed flag. */
     private final AtomicBoolean closed = new AtomicBoolean();
@@ -139,6 +153,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
         validateConfiguration(cfg);
 
+        for (ClientNotificationType type : ClientNotificationType.values())
+            pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
+
         Executor cfgExec = cfg.getAsyncContinuationExecutor();
         asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool();
 
@@ -167,12 +184,24 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /**
      * Close the channel with cause.
      */
-    private void close(Throwable cause) {
+    private void close(Exception cause) {
         if (closed.compareAndSet(false, true)) {
             U.closeQuiet(sock);
 
             for (ClientRequestFuture pendingReq : pendingReqs.values())
                 pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
+
+            notificationLsnrsGuard.readLock().lock();
+
+            try {
+                for (Map<Long, NotificationListener> lsnrs : notificationLsnrs) {
+                    if (lsnrs != null)
+                        lsnrs.values().forEach(lsnr -> lsnr.onChannelClosed(cause));
+                }
+            }
+            finally {
+                notificationLsnrsGuard.readLock().unlock();
+            }
         }
     }
 
@@ -341,7 +370,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             return;
         }
 
-        long resId = dataInput.readLong();
+        Long resId = dataInput.readLong();
 
         int status = 0;
 
@@ -365,7 +394,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
 
                 notificationOp = ClientOperation.fromCode(notificationCode);
 
-                if (notificationOp == null || !notificationOp.isNotification())
+                if (notificationOp == null || notificationOp.notificationType() == null)
                     throw new ClientProtocolError(String.format("Unexpected notification code [%d]", notificationCode));
             }
 
@@ -405,11 +434,30 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             pendingReq.onDone(res, err);
         }
         else { // Notification received.
-            ClientOperation notificationOp0 = notificationOp;
+            ClientNotificationType notificationType = notificationOp.notificationType();
 
             asyncContinuationExecutor.execute(() -> {
-                for (NotificationListener lsnr : notificationLsnrs)
-                    lsnr.acceptNotification(this, notificationOp0, resId, res, err);
+                NotificationListener lsnr = null;
+
+                notificationLsnrsGuard.readLock().lock();
+
+                try {
+                    Map<Long, NotificationListener> lsrns = notificationLsnrs[notificationType.ordinal()];
+
+                    if (lsrns != null)
+                        lsnr = lsrns.get(resId);
+
+                    if (lsnr == null) {
+                        pendingNotifications[notificationType.ordinal()].computeIfAbsent(resId,
+                            k -> new ConcurrentLinkedQueue<>()).add(new T2<>(res, err));
+                    }
+                }
+                finally {
+                    notificationLsnrsGuard.readLock().unlock();
+                }
+
+                if (lsnr != null)
+                    lsnr.acceptNotification(res, err);
             });
         }
     }
@@ -435,8 +483,51 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     }
 
     /** {@inheritDoc} */
-    @Override public void addNotificationListener(NotificationListener lsnr) {
-        notificationLsnrs.add(lsnr);
+    @Override public void addNotificationListener(ClientNotificationType type, Long rsrcId, NotificationListener lsnr) {
+        Queue<T2<ByteBuffer, Exception>> pendingQueue;
+
+        notificationLsnrsGuard.writeLock().lock();
+
+        try {
+            if (closed())
+                throw new ClientConnectionException("Channel is closed");
+
+            Map<Long, NotificationListener> lsnrs = notificationLsnrs[type.ordinal()];
+
+            if (lsnrs == null)
+                notificationLsnrs[type.ordinal()] = lsnrs = new ConcurrentHashMap<>();
+
+            lsnrs.put(rsrcId, lsnr);
+
+            pendingQueue = pendingNotifications[type.ordinal()].remove(rsrcId);
+        }
+        finally {
+            notificationLsnrsGuard.writeLock().unlock();
+        }
+
+        // Drain pending notifications queue.
+        if (pendingQueue != null)
+            pendingQueue.forEach(n -> lsnr.acceptNotification(n.get1(), n.get2()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNotificationListener(ClientNotificationType type, Long rsrcId) {
+        notificationLsnrsGuard.writeLock().lock();
+
+        try {
+            Map<Long, NotificationListener> lsnrs = notificationLsnrs[type.ordinal()];
+
+            if (lsnrs == null)
+                return;
+
+            lsnrs.remove(rsrcId);
+
+            pendingNotifications[type.ordinal()].remove(rsrcId);
+
+        }
+        finally {
+            notificationLsnrsGuard.writeLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 437798b..69d6fda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -61,7 +61,7 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 /**
- * Checks compute grid funtionality of thin client.
+ * Checks compute grid functionality of thin client.
  */
 public class ComputeTaskTest extends AbstractThinClientTest {
     /** Grids count. */
@@ -245,7 +245,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
             assertTrue(fut.cancel(true));
 
             assertTrue(GridTestUtils.waitForCondition(
-                () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+                () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
 
             assertTrue(fut.isCancelled());
             assertTrue(fut.isDone());
@@ -274,7 +274,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
             fut.cancel(true);
 
             assertTrue(GridTestUtils.waitForCondition(
-                () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+                () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
 
             assertTrue(fut.isCancelled());
             assertTrue(fut.isDone());
@@ -443,7 +443,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
 
             fut3.get(TIMEOUT, TimeUnit.MILLISECONDS);
 
-            assertTrue(GridTestUtils.waitForCondition(() -> compute.activeTaskFutures().isEmpty(), TIMEOUT));
+            assertTrue(GridTestUtils.waitForCondition(() -> compute.activeTasksCount() == 0, TIMEOUT));
         }
     }
 
@@ -529,7 +529,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
                 futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
 
             assertTrue(GridTestUtils.waitForCondition(
-                    () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().size() == ACTIVE_TASKS_LIMIT,
+                    () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == ACTIVE_TASKS_LIMIT,
                     TIMEOUT));
 
             // Check that we can't start more tasks.
@@ -628,7 +628,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
                 }, threadsCnt, "run-task-async");
 
             assertTrue(GridTestUtils.waitForCondition(
-                () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+                () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 686a193..f61083b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -366,7 +366,13 @@ public class ReliableChannelTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void addNotificationListener(NotificationListener lsnr) {
+        @Override public void addNotificationListener(ClientNotificationType type, Long rsrcId,
+            NotificationListener lsnr) {
+            /* No-op */
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNotificationListener(ClientNotificationType type, Long rsrcId) {
             /* No-op */
         }