You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/03/04 11:38:34 UTC
[ignite-3] branch main updated: IGNITE-14279 "sendWithResponse"
introduced to NetworkCluster interface.
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d50c84c IGNITE-14279 "sendWithResponse" introduced to NetworkCluster interface.
d50c84c is described below
commit d50c84c2fb5dbe0bd2c2a6be93fe9f9dd1850524
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu Mar 4 14:33:51 2021 +0300
IGNITE-14279 "sendWithResponse" introduced to NetworkCluster interface.
Signed-off-by: ibessonov <be...@gmail.com>
---
.../org/apache/ignite/network/NetworkCluster.java | 15 ++++++++++++++-
.../network/scalecube/ScaleCubeNetworkCluster.java | 20 ++++++++++----------
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 5bdd576..f271b53 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
@@ -59,7 +60,19 @@ public interface NetworkCluster {
* @param member Network member which should receive the message.
* @param msg Message which should be delivered.
*/
- Future<?> guaranteedSend(NetworkMember member, Object msg);
+ Future<?> send(NetworkMember member, Object msg);
+
+ /**
+ * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and
+ * returns a response (RPC style).
+ *
+ * @param member Network member which should receive the message.
+ * @param msg A message.
+ * @param timeout Waiting for response timeout in milliseconds.
+ * @param <R> Expected response type.
+ * @return A future holding the response or error if the expected response was not received.
+ */
+ <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout);
/**
* Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index 0dc1087..b985dba 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -16,19 +16,20 @@
*/
package org.apache.ignite.network.scalecube;
+import io.scalecube.cluster.Cluster;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import io.scalecube.cluster.Cluster;
+import org.apache.ignite.network.MessageHandlerHolder;
import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.network.NetworkClusterEventHandler;
import org.apache.ignite.network.NetworkHandlersProvider;
import org.apache.ignite.network.NetworkMember;
import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.MessageHandlerHolder;
import static io.scalecube.cluster.transport.api.Message.fromData;
+import static java.time.Duration.ofMillis;
/**
* Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -84,15 +85,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
}
/** {@inheritDoc} */
- @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
- cluster.send(memberResolver.resolveMember(member), fromData(msg))
- .block();
-
- CompletableFuture<Object> future = new CompletableFuture<>();
-
- future.complete(null);
+ @Override public Future<?> send(NetworkMember member, Object msg) {
+ return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+ }
- return future;
+ /** {@inheritDoc} */
+ @Override public <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout) {
+ return cluster.requestResponse(memberResolver.resolveMember(member), fromData(msg))
+ .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
}
/** {@inheritDoc} */