You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/21 17:07:00 UTC

[ignite-3] branch main updated: IGNITE-14762 Fix preemptive message handler execution #143

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 70e0c1c  IGNITE-14762 Fix preemptive message handler execution #143
70e0c1c is described below

commit 70e0c1c99bff004da2c4f5f45f1df7759d82f299
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri May 21 20:06:01 2021 +0300

    IGNITE-14762 Fix preemptive message handler execution #143
    
    Signed-off-by: Andrey Gura <ag...@apache.org>
---
 .../scalecube/DelegatingTransportFactory.java      | 30 ++++++++++++++--------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
index a14d30f..3d04557 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.network.scalecube;
 
+import java.util.Objects;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.Transport;
 import io.scalecube.cluster.transport.api.TransportConfig;
 import io.scalecube.cluster.transport.api.TransportFactory;
 import io.scalecube.net.Address;
-import java.util.Objects;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -98,16 +99,23 @@ class DelegatingTransportFactory implements TransportFactory {
                 Objects.requireNonNull(request, "request must be not null");
                 Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
 
-                Mono<Message> result = listen()
-                    .filter(resp -> resp.correlationId() != null)
-                    .filter(resp -> resp.correlationId().equals(request.correlationId()))
-                    .next();
-
-                // manually fire the message event instead of sending it, because otherwise it will be received
-                // immediately, replacing the response that might have been sent by the event handlers.
-                messagingService.fireEvent(request);
-
-                return result;
+                return Mono.create(sink -> {
+                    // listen() returns a lazy Flux, so we need to subscribe to it first, using a sink.
+                    // Otherwise, message handlers can execute faster than the consumer will be able to start listening
+                    // to incoming messages.
+                    Disposable disposable = listen()
+                        .filter(resp -> resp.correlationId() != null)
+                        .filter(resp -> resp.correlationId().equals(request.correlationId()))
+                        .take(1)
+                        .subscribe(sink::success, sink::error, sink::success);
+
+                    // cancel the nested subscription if the client cancels the outer Mono
+                    sink.onDispose(disposable);
+
+                    // manually fire the message event instead of sending it, because otherwise it will be received
+                    // immediately, replacing the response that might have been sent by the event handlers.
+                    messagingService.fireEvent(request);
+                });
             }
         };
     }