You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/04/06 19:44:17 UTC
[ignite] branch master updated: IGNITE-14492 Java thin client:
Refactor notification listener - Fixes #8978.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 43123aa IGNITE-14492 Java thin client: Refactor notification listener - Fixes #8978.
43123aa is described below
commit 43123aa8e66b1384382131ee7476e5b93f29b1d0
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Apr 6 22:36:09 2021 +0300
IGNITE-14492 Java thin client: Refactor notification listener - Fixes #8978.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../ignite/internal/client/thin/ClientChannel.java | 9 +-
.../internal/client/thin/ClientComputeImpl.java | 249 +++++++--------------
...onListener.java => ClientNotificationType.java} | 18 +-
.../internal/client/thin/ClientOperation.java | 19 +-
.../internal/client/thin/NotificationListener.java | 12 +-
.../internal/client/thin/ReliableChannel.java | 50 +----
.../internal/client/thin/TcpClientChannel.java | 109 ++++++++-
.../internal/client/thin/ComputeTaskTest.java | 12 +-
.../internal/client/thin/ReliableChannelTest.java | 8 +-
9 files changed, 220 insertions(+), 266 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index 4f1b91e..90a1e4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -88,8 +88,15 @@ interface ClientChannel extends AutoCloseable {
/**
* Add notifications (from server to client) listener.
+ *
+ * @throws ClientConnectionException if channel is already closed.
+ */
+ public void addNotificationListener(ClientNotificationType type, Long rsrcId, NotificationListener lsnr);
+
+ /**
+ * Remove notifications (from server to client) listener.
*/
- public void addNotificationListener(NotificationListener lsnr);
+ public void removeNotificationListener(ClientNotificationType type, Long rsrcId);
/**
* @return {@code True} channel is closed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index 305a138..7c220c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -19,19 +19,14 @@ package org.apache.ignite.internal.client.thin;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.ClientClusterGroup;
import org.apache.ignite.client.ClientCompute;
@@ -39,25 +34,22 @@ import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.client.thin.ClientNotificationType.COMPUTE_TASK_FINISHED;
import static org.apache.ignite.internal.client.thin.ClientOperation.COMPUTE_TASK_EXECUTE;
import static org.apache.ignite.internal.client.thin.ClientOperation.RESOURCE_CLOSE;
/**
* Implementation of {@link ClientCompute}.
*/
-class ClientComputeImpl implements ClientCompute, NotificationListener {
+class ClientComputeImpl implements ClientCompute {
/** No failover flag mask. */
private static final byte NO_FAILOVER_FLAG_MASK = 0x01;
@@ -73,11 +65,8 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
/** Default cluster group. */
private final ClientClusterGroupImpl dfltGrp;
- /** Active tasks. */
- private final Map<ClientChannel, Map<Long, ClientComputeTask<Object>>> activeTasks = new ConcurrentHashMap<>();
-
- /** Guard lock for active tasks. */
- private final ReadWriteLock guard = new ReentrantReadWriteLock();
+ /** Active tasks count. */
+ private final AtomicInteger tasksCnt = new AtomicInteger();
/** Constructor. */
ClientComputeImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl dfltGrp) {
@@ -85,24 +74,6 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
this.dfltGrp = dfltGrp;
utils = new ClientUtils(marsh);
-
- ch.addNotificationListener(this);
-
- ch.addChannelCloseListener(clientCh -> {
- guard.writeLock().lock();
-
- try {
- Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.remove(clientCh);
-
- if (!F.isEmpty(chTasks)) {
- for (ClientComputeTask<?> task : chTasks.values())
- task.fut.onDone(new ClientConnectionException("Channel to server is closed"));
- }
- }
- finally {
- guard.writeLock().unlock();
- }
- });
}
/** {@inheritDoc} */
@@ -178,9 +149,9 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
* @return Resulting client exception.
*/
private ClientException convertException(Throwable t) {
- if (t instanceof ClientException) {
+ if (t instanceof ClientException)
return (ClientException) t;
- } else if (t.getCause() instanceof ClientException)
+ else if (t.getCause() instanceof ClientException)
return (ClientException)t.getCause();
else
return new ClientException(t);
@@ -211,90 +182,71 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
Consumer<PayloadOutputChannel> payloadWriter =
ch -> writeExecuteTaskRequest(ch, taskName, arg, nodeIds, flags, timeout);
- Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader =
- ch -> new T2<>(ch.clientChannel(), ch.in().readLong());
+ Function<PayloadInputChannel, ClientComputeTask<R>> payloadReader = ch -> {
+ Long taskId = ch.in().readLong();
- IgniteClientFuture<T2<ClientChannel, Long>> initFut = ch.serviceAsync(
+ ClientComputeTask<R> task = new ClientComputeTask<>(utils, ch.clientChannel(), taskId);
+
+ ch.clientChannel().addNotificationListener(COMPUTE_TASK_FINISHED, taskId, task);
+
+ return task;
+ };
+
+ IgniteClientFuture<ClientComputeTask<R>> initFut = ch.serviceAsync(
COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader);
CompletableFuture<R> resFut = new CompletableFuture<>();
AtomicReference<Object> cancellationToken = new AtomicReference<>();
- initFut.handle((taskParams, err) ->
- handleExecuteInitFuture(payloadWriter, payloadReader, resFut, cancellationToken, taskParams, err));
+ initFut.handle((task, err) -> handleExecuteInitFuture(resFut, cancellationToken, task, err));
return new IgniteClientFutureImpl<>(resFut, mayInterruptIfRunning -> {
// 1. initFut has not completed - store cancellation flag.
// 2. initFut has completed - cancel compute future.
if (!cancellationToken.compareAndSet(null, mayInterruptIfRunning)) {
- try {
- GridFutureAdapter<?> fut = (GridFutureAdapter<?>) cancellationToken.get();
+ GridFutureAdapter<?> fut = (GridFutureAdapter<?>) cancellationToken.get();
- if (cancelGridFuture(fut, mayInterruptIfRunning)) {
- resFut.cancel(mayInterruptIfRunning);
- return true;
- } else {
- return false;
- }
- } catch (IgniteCheckedException e) {
- throw IgniteUtils.convertException(e);
- }
+ if (!cancelGridFuture(fut, mayInterruptIfRunning))
+ return false;
}
resFut.cancel(mayInterruptIfRunning);
+
return true;
});
}
/**
* Handles execute initialization.
- * @param payloadWriter Writer.
- * @param payloadReader Reader.
+ *
* @param resFut Resulting future.
* @param cancellationToken Cancellation token holder.
- * @param taskParams Task parameters.
+ * @param task Task.
* @param err Error
* @param <R> Result type.
* @return Null.
*/
private <R> Object handleExecuteInitFuture(
- Consumer<PayloadOutputChannel> payloadWriter,
- Function<PayloadInputChannel, T2<ClientChannel, Long>> payloadReader,
CompletableFuture<R> resFut,
AtomicReference<Object> cancellationToken,
- T2<ClientChannel, Long> taskParams,
+ ClientComputeTask<R> task,
Throwable err) {
- if (err != null) {
+ if (err != null)
resFut.completeExceptionally(new ClientException(err));
- } else {
- ClientComputeTask<Object> task = addTask(taskParams.get1(), taskParams.get2());
-
- if (task == null) {
- // Channel closed - try again recursively.
- ch.serviceAsync(COMPUTE_TASK_EXECUTE, payloadWriter, payloadReader)
- .handle((r, e) ->
- handleExecuteInitFuture(payloadWriter, payloadReader, resFut, cancellationToken, r, e));
- }
+ else {
+ if (!cancellationToken.compareAndSet(null, task.fut))
+ cancelGridFuture(task.fut, (Boolean) cancellationToken.get());
- if (!cancellationToken.compareAndSet(null, task.fut)) {
- try {
- cancelGridFuture(task.fut, (Boolean) cancellationToken.get());
- } catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
+ tasksCnt.incrementAndGet();
task.fut.listen(f -> {
- // Don't remove task if future was canceled by user. This task can be added again later by notification.
- // To prevent leakage tasks for cancelled futures will be removed on notification (or channel close event).
- if (!f.isCancelled()) {
- removeTask(task.ch, task.taskId);
+ tasksCnt.decrementAndGet();
- try {
- resFut.complete(((GridFutureAdapter<R>) f).get());
- } catch (IgniteCheckedException e) {
- resFut.completeExceptionally(e.getCause());
- }
+ if (!f.isCancelled()) {
+ if (f.error() == null)
+ resFut.complete(f.result());
+ else
+ resFut.completeExceptionally(f.error());
}
});
}
@@ -309,9 +261,13 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
* @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
* otherwise, in-progress tasks are allowed to complete.
*/
- private static boolean cancelGridFuture(GridFutureAdapter<?> fut, Boolean mayInterruptIfRunning)
- throws IgniteCheckedException {
- return mayInterruptIfRunning ? fut.cancel() : fut.onCancelled();
+ private static boolean cancelGridFuture(GridFutureAdapter<?> fut, Boolean mayInterruptIfRunning) {
+ try {
+ return mayInterruptIfRunning ? fut.cancel() : fut.onCancelled();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/**
@@ -349,93 +305,12 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
}
}
- /** {@inheritDoc} */
- @Override public void acceptNotification(
- ClientChannel ch,
- ClientOperation op,
- long rsrcId,
- ByteBuffer payload,
- Exception err
- ) {
- if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
- ClientComputeTask<Object> task = addTask(ch, rsrcId);
-
- if (task != null) { // If channel is closed concurrently, task is already done with "channel closed" reason.
- if (err == null) {
- try {
- Object res = payload == null ? null :
- utils.readObject(BinaryByteBufferInputStream.create(payload), false);
-
- task.fut.onDone(res);
- }
- catch (Throwable e) {
- task.fut.onDone(e);
- }
- }
- else
- task.fut.onDone(err);
-
- if (task.fut.isCancelled())
- removeTask(ch, rsrcId);
- }
- }
- }
-
/**
- * @param ch Client channel.
- * @param taskId Task id.
- * @return Already registered task, new task if task wasn't registered before, or {@code null} if channel was
- * closed concurrently.
- */
- private ClientComputeTask<Object> addTask(ClientChannel ch, long taskId) {
- guard.readLock().lock();
-
- try {
- // If channel is closed we should only get task if it was registered before, but not add new one.
- boolean closed = ch.closed();
-
- Map<Long, ClientComputeTask<Object>> chTasks = closed ? activeTasks.get(ch) :
- activeTasks.computeIfAbsent(ch, c -> new ConcurrentHashMap<>());
-
- if (chTasks == null)
- return null;
-
- return closed ? chTasks.get(taskId) :
- chTasks.computeIfAbsent(taskId, t -> new ClientComputeTask<>(ch, taskId));
- }
- finally {
- guard.readLock().unlock();
- }
- }
-
- /**
- * @param ch Client channel.
- * @param taskId Task id.
- */
- private ClientComputeTask<Object> removeTask(ClientChannel ch, long taskId) {
- Map<Long, ClientComputeTask<Object>> chTasks = activeTasks.get(ch);
-
- if (!F.isEmpty(chTasks))
- return chTasks.remove(taskId);
-
- return null;
- }
-
- /**
- * Gets tasks future for active tasks started by client.
+ * Gets count of active tasks started by client.
* Used only for tests.
- *
- * @return Map of active tasks keyed by their unique per client task ID.
*/
- Map<IgniteUuid, IgniteInternalFuture<?>> activeTaskFutures() {
- Map<IgniteUuid, IgniteInternalFuture<?>> res = new HashMap<>();
-
- for (Map.Entry<ClientChannel, Map<Long, ClientComputeTask<Object>>> chTasks : activeTasks.entrySet()) {
- for (Map.Entry<Long, ClientComputeTask<Object>> task : chTasks.getValue().entrySet())
- res.put(new IgniteUuid(chTasks.getKey().serverNodeId(), task.getKey()), task.getValue().fut);
- }
-
- return res;
+ int activeTasksCount() {
+ return tasksCnt.get();
}
/**
@@ -508,7 +383,7 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
*
* @param <R> Result type.
*/
- private static class ClientComputeTask<R> {
+ private static class ClientComputeTask<R> implements NotificationListener {
/** Client channel. */
private final ClientChannel ch;
@@ -518,11 +393,15 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
/** Future. */
private final GridFutureAdapter<R> fut;
+ /** */
+ private final ClientUtils utils;
+
/**
* @param ch Client channel.
* @param taskId Task id.
*/
- private ClientComputeTask(ClientChannel ch, long taskId) {
+ private ClientComputeTask(ClientUtils utils, ClientChannel ch, Long taskId) {
+ this.utils = utils;
this.ch = ch;
this.taskId = taskId;
@@ -547,5 +426,29 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
}
};
}
+
+ /** {@inheritDoc} */
+ @Override public void acceptNotification(ByteBuffer payload, Exception err) {
+ if (err == null) {
+ try {
+ R res = payload == null ? null :
+ utils.readObject(BinaryByteBufferInputStream.create(payload), false);
+
+ fut.onDone(res);
+ }
+ catch (Throwable e) {
+ fut.onDone(e);
+ }
+ }
+ else
+ fut.onDone(err);
+
+ ch.removeNotificationListener(COMPUTE_TASK_FINISHED, taskId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onChannelClosed(Exception reason) {
+ fut.onDone(new ClientConnectionException("Connection to server is closed", reason));
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
index 3aee483..f792908 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
@@ -17,20 +17,10 @@
package org.apache.ignite.internal.client.thin;
-import java.nio.ByteBuffer;
-
/**
- * Server to client notification listener.
+ * Notification types.
*/
-interface NotificationListener {
- /**
- * Accept notification.
- *
- * @param ch Client channel which was notified.
- * @param op Client operation.
- * @param rsrcId Resource id.
- * @param payload Notification payload or {@code null} if there is no payload.
- * @param err Error.
- */
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err);
+enum ClientNotificationType {
+ /** Compute task finished. */
+ COMPUTE_TASK_FINISHED;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 2ec8f45..987345d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -78,25 +78,26 @@ enum ClientOperation {
/** Get nodes info by IDs. */CLUSTER_GROUP_GET_NODE_INFO(5101),
/** Execute compute task. */COMPUTE_TASK_EXECUTE(6000),
- /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true),
+ /** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001,
+ ClientNotificationType.COMPUTE_TASK_FINISHED),
/** Invoke service. */SERVICE_INVOKE(7000);
/** Code. */
private final int code;
- /** Is notification. */
- private final boolean notification;
+ /** Type of notification. */
+ private final ClientNotificationType notificationType;
/** Constructor. */
ClientOperation(int code) {
- this(code, false);
+ this(code, null);
}
/** Constructor. */
- ClientOperation(int code, boolean notification) {
+ ClientOperation(int code, ClientNotificationType notificationType) {
this.code = code;
- this.notification = notification;
+ this.notificationType = notificationType;
}
/**
@@ -107,10 +108,10 @@ enum ClientOperation {
}
/**
- * @return {@code True} if operation is notification.
+ * @return Type of notification.
*/
- public boolean isNotification() {
- return notification;
+ public ClientNotificationType notificationType() {
+ return notificationType;
}
/** Enum mapping from code to values. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index 3aee483..e7e233f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -26,11 +26,15 @@ interface NotificationListener {
/**
* Accept notification.
*
- * @param ch Client channel which was notified.
- * @param op Client operation.
- * @param rsrcId Resource id.
* @param payload Notification payload or {@code null} if there is no payload.
* @param err Error.
*/
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err);
+ public void acceptNotification(ByteBuffer payload, Exception err);
+
+ /**
+ * Handles connection loss.
+ *
+ * @param reason Exception that caused the disconnect, can be {@code null}.
+ */
+ public void onChannelClosed(Exception reason);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 195088d..3637543 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -31,7 +30,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
@@ -59,7 +57,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Communication channel with failover and partition awareness.
*/
-final class ReliableChannel implements AutoCloseable, NotificationListener {
+final class ReliableChannel implements AutoCloseable {
/** Do nothing helper function. */
private static final Consumer<Integer> DO_NOTHING = (v) -> {};
@@ -84,12 +82,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/** Node channels. */
private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
- /** Notification listeners. */
- private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
-
- /** Listeners of channel close events. */
- private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>();
-
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
@@ -388,42 +380,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
}
/**
- * Add notification listener.
- *
- * @param lsnr Listener.
- */
- public void addNotificationListener(NotificationListener lsnr) {
- notificationLsnrs.add(lsnr);
- }
-
- /**
- * Add listener of channel close event.
- *
- * @param lsnr Listener.
- */
- public void addChannelCloseListener(Consumer<ClientChannel> lsnr) {
- channelCloseLsnrs.add(lsnr);
- }
-
- /** {@inheritDoc} */
- @Override public void acceptNotification(
- ClientChannel ch,
- ClientOperation op,
- long rsrcId,
- ByteBuffer payload,
- Exception err
- ) {
- for (NotificationListener lsnr : notificationLsnrs) {
- try {
- lsnr.acceptNotification(ch, op, rsrcId, payload, err);
- }
- catch (Exception ignore) {
- // No-op.
- }
- }
- }
-
- /**
* Checks if affinity information for the cache is up to date and tries to update it if not.
*
* @return {@code True} if affinity information is up to date, {@code false} if there is not affinity information
@@ -922,7 +878,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
if (channel.serverNodeId() != null) {
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
- channel.addNotificationListener(ReliableChannel.this);
UUID prevId = serverNodeId;
@@ -952,9 +907,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
if (ch != null) {
U.closeQuiet(ch);
- for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
- lsnr.accept(ch);
-
ch = null;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 11a5016..3cd8688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -24,14 +24,18 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
+import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
@@ -62,6 +66,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientFlag;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -123,7 +128,16 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();
/** Notification listeners. */
- private final Collection<NotificationListener> notificationLsnrs = new CopyOnWriteArrayList<>();
+ @SuppressWarnings("unchecked")
+ private final Map<Long, NotificationListener>[] notificationLsnrs = new Map[ClientNotificationType.values().length];
+
+ /** Pending notification. */
+ @SuppressWarnings("unchecked")
+ private final Map<Long, Queue<T2<ByteBuffer, Exception>>>[] pendingNotifications =
+ new Map[ClientNotificationType.values().length];
+
+ /** Guard for notification listeners. */
+ private final ReadWriteLock notificationLsnrsGuard = new ReentrantReadWriteLock();
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -139,6 +153,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
validateConfiguration(cfg);
+ for (ClientNotificationType type : ClientNotificationType.values())
+ pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
+
Executor cfgExec = cfg.getAsyncContinuationExecutor();
asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool();
@@ -167,12 +184,24 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/**
* Close the channel with cause.
*/
- private void close(Throwable cause) {
+ private void close(Exception cause) {
if (closed.compareAndSet(false, true)) {
U.closeQuiet(sock);
for (ClientRequestFuture pendingReq : pendingReqs.values())
pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
+
+ notificationLsnrsGuard.readLock().lock();
+
+ try {
+ for (Map<Long, NotificationListener> lsnrs : notificationLsnrs) {
+ if (lsnrs != null)
+ lsnrs.values().forEach(lsnr -> lsnr.onChannelClosed(cause));
+ }
+ }
+ finally {
+ notificationLsnrsGuard.readLock().unlock();
+ }
}
}
@@ -341,7 +370,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
return;
}
- long resId = dataInput.readLong();
+ Long resId = dataInput.readLong();
int status = 0;
@@ -365,7 +394,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
notificationOp = ClientOperation.fromCode(notificationCode);
- if (notificationOp == null || !notificationOp.isNotification())
+ if (notificationOp == null || notificationOp.notificationType() == null)
throw new ClientProtocolError(String.format("Unexpected notification code [%d]", notificationCode));
}
@@ -405,11 +434,30 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
pendingReq.onDone(res, err);
}
else { // Notification received.
- ClientOperation notificationOp0 = notificationOp;
+ ClientNotificationType notificationType = notificationOp.notificationType();
asyncContinuationExecutor.execute(() -> {
- for (NotificationListener lsnr : notificationLsnrs)
- lsnr.acceptNotification(this, notificationOp0, resId, res, err);
+ NotificationListener lsnr = null;
+
+ notificationLsnrsGuard.readLock().lock();
+
+ try {
+ Map<Long, NotificationListener> lsrns = notificationLsnrs[notificationType.ordinal()];
+
+ if (lsrns != null)
+ lsnr = lsrns.get(resId);
+
+ if (lsnr == null) {
+ pendingNotifications[notificationType.ordinal()].computeIfAbsent(resId,
+ k -> new ConcurrentLinkedQueue<>()).add(new T2<>(res, err));
+ }
+ }
+ finally {
+ notificationLsnrsGuard.readLock().unlock();
+ }
+
+ if (lsnr != null)
+ lsnr.acceptNotification(res, err);
});
}
}
@@ -435,8 +483,51 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
}
/** {@inheritDoc} */
- @Override public void addNotificationListener(NotificationListener lsnr) {
- notificationLsnrs.add(lsnr);
+ @Override public void addNotificationListener(ClientNotificationType type, Long rsrcId, NotificationListener lsnr) {
+ Queue<T2<ByteBuffer, Exception>> pendingQueue;
+
+ notificationLsnrsGuard.writeLock().lock();
+
+ try {
+ if (closed())
+ throw new ClientConnectionException("Channel is closed");
+
+ Map<Long, NotificationListener> lsnrs = notificationLsnrs[type.ordinal()];
+
+ if (lsnrs == null)
+ notificationLsnrs[type.ordinal()] = lsnrs = new ConcurrentHashMap<>();
+
+ lsnrs.put(rsrcId, lsnr);
+
+ pendingQueue = pendingNotifications[type.ordinal()].remove(rsrcId);
+ }
+ finally {
+ notificationLsnrsGuard.writeLock().unlock();
+ }
+
+ // Drain pending notifications queue.
+ if (pendingQueue != null)
+ pendingQueue.forEach(n -> lsnr.acceptNotification(n.get1(), n.get2()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNotificationListener(ClientNotificationType type, Long rsrcId) {
+ notificationLsnrsGuard.writeLock().lock();
+
+ try {
+ Map<Long, NotificationListener> lsnrs = notificationLsnrs[type.ordinal()];
+
+ if (lsnrs == null)
+ return;
+
+ lsnrs.remove(rsrcId);
+
+ pendingNotifications[type.ordinal()].remove(rsrcId);
+
+ }
+ finally {
+ notificationLsnrsGuard.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 437798b..69d6fda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -61,7 +61,7 @@ import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
- * Checks compute grid funtionality of thin client.
+ * Checks compute grid functionality of thin client.
*/
public class ComputeTaskTest extends AbstractThinClientTest {
/** Grids count. */
@@ -245,7 +245,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
assertTrue(fut.cancel(true));
assertTrue(GridTestUtils.waitForCondition(
- () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+ () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
assertTrue(fut.isCancelled());
assertTrue(fut.isDone());
@@ -274,7 +274,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
fut.cancel(true);
assertTrue(GridTestUtils.waitForCondition(
- () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+ () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
assertTrue(fut.isCancelled());
assertTrue(fut.isDone());
@@ -443,7 +443,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
fut3.get(TIMEOUT, TimeUnit.MILLISECONDS);
- assertTrue(GridTestUtils.waitForCondition(() -> compute.activeTaskFutures().isEmpty(), TIMEOUT));
+ assertTrue(GridTestUtils.waitForCondition(() -> compute.activeTasksCount() == 0, TIMEOUT));
}
}
@@ -529,7 +529,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
futs.add(compute.executeAsync(TestLatchTask.class.getName(), null));
assertTrue(GridTestUtils.waitForCondition(
- () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().size() == ACTIVE_TASKS_LIMIT,
+ () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == ACTIVE_TASKS_LIMIT,
TIMEOUT));
// Check that we can't start more tasks.
@@ -628,7 +628,7 @@ public class ComputeTaskTest extends AbstractThinClientTest {
}, threadsCnt, "run-task-async");
assertTrue(GridTestUtils.waitForCondition(
- () -> ((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
+ () -> ((ClientComputeImpl)client.compute()).activeTasksCount() == 0, TIMEOUT));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 686a193..f61083b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -366,7 +366,13 @@ public class ReliableChannelTest {
}
/** {@inheritDoc} */
- @Override public void addNotificationListener(NotificationListener lsnr) {
+ @Override public void addNotificationListener(ClientNotificationType type, Long rsrcId,
+ NotificationListener lsnr) {
+ /* No-op */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNotificationListener(ClientNotificationType type, Long rsrcId) {
/* No-op */
}