You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/03/04 11:38:34 UTC

[ignite-3] branch main updated: IGNITE-14279 "sendWithResponse" introduced to NetworkCluster interface.

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d50c84c  IGNITE-14279 "sendWithResponse" introduced to NetworkCluster interface.
d50c84c is described below

commit d50c84c2fb5dbe0bd2c2a6be93fe9f9dd1850524
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu Mar 4 14:33:51 2021 +0300

    IGNITE-14279 "sendWithResponse" introduced to NetworkCluster interface.
    
    Signed-off-by: ibessonov <be...@gmail.com>
---
 .../org/apache/ignite/network/NetworkCluster.java    | 15 ++++++++++++++-
 .../network/scalecube/ScaleCubeNetworkCluster.java   | 20 ++++++++++----------
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 5bdd576..f271b53 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.network;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 /**
@@ -59,7 +60,19 @@ public interface NetworkCluster {
      * @param member Network member which should receive the message.
      * @param msg Message which should be delivered.
      */
-    Future<?> guaranteedSend(NetworkMember member, Object msg);
+    Future<?> send(NetworkMember member, Object msg);
+
+    /**
+     * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and
+     * returns a response (RPC style).
+     *
+     * @param member Network member which should receive the message.
+     * @param msg A message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @param <R> Expected response type.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout);
 
     /**
      * Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index 0dc1087..b985dba 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -16,19 +16,20 @@
  */
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.Cluster;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
-import io.scalecube.cluster.Cluster;
+import org.apache.ignite.network.MessageHandlerHolder;
 import org.apache.ignite.network.NetworkCluster;
 import org.apache.ignite.network.NetworkClusterEventHandler;
 import org.apache.ignite.network.NetworkHandlersProvider;
 import org.apache.ignite.network.NetworkMember;
 import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.MessageHandlerHolder;
 
 import static io.scalecube.cluster.transport.api.Message.fromData;
+import static java.time.Duration.ofMillis;
 
 /**
  * Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -84,15 +85,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
-        cluster.send(memberResolver.resolveMember(member), fromData(msg))
-            .block();
-
-        CompletableFuture<Object> future = new CompletableFuture<>();
-
-        future.complete(null);
+    @Override public Future<?> send(NetworkMember member, Object msg) {
+        return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+    }
 
-        return future;
+    /** {@inheritDoc} */
+    @Override public <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout) {
+        return cluster.requestResponse(memberResolver.resolveMember(member), fromData(msg))
+            .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
     }
 
     /** {@inheritDoc} */