You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/21 17:07:00 UTC
[ignite-3] branch main updated: IGNITE-14762 Fix preemptive message
handler execution #143
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 70e0c1c IGNITE-14762 Fix preemptive message handler execution #143
70e0c1c is described below
commit 70e0c1c99bff004da2c4f5f45f1df7759d82f299
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri May 21 20:06:01 2021 +0300
IGNITE-14762 Fix preemptive message handler execution #143
Signed-off-by: Andrey Gura <ag...@apache.org>
---
.../scalecube/DelegatingTransportFactory.java | 30 ++++++++++++++--------
1 file changed, 19 insertions(+), 11 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
index a14d30f..3d04557 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
@@ -17,12 +17,13 @@
package org.apache.ignite.network.scalecube;
+import java.util.Objects;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
-import java.util.Objects;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -98,16 +99,23 @@ class DelegatingTransportFactory implements TransportFactory {
Objects.requireNonNull(request, "request must be not null");
Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
- Mono<Message> result = listen()
- .filter(resp -> resp.correlationId() != null)
- .filter(resp -> resp.correlationId().equals(request.correlationId()))
- .next();
-
- // manually fire the message event instead of sending it, because otherwise it will be received
- // immediately, replacing the response that might have been sent by the event handlers.
- messagingService.fireEvent(request);
-
- return result;
+ return Mono.create(sink -> {
+ // listen() returns a lazy Flux, so we need to subscribe to it first, using a sink.
+ // Otherwise, message handlers can execute faster than the consumer will be able to start listening
+ // to incoming messages.
+ Disposable disposable = listen()
+ .filter(resp -> resp.correlationId() != null)
+ .filter(resp -> resp.correlationId().equals(request.correlationId()))
+ .take(1)
+ .subscribe(sink::success, sink::error, sink::success);
+
+ // cancel the nested subscription if the client cancels the outer Mono
+ sink.onDispose(disposable);
+
+ // manually fire the message event instead of sending it, because otherwise it will be received
+ // immediately, replacing the response that might have been sent by the event handlers.
+ messagingService.fireEvent(request);
+ });
}
};
}