You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/22 12:16:25 UTC
[ignite-3] branch main updated: IGNITE-16677 Implement broadcast method of IgniteCompute interface
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cb12d27 IGNITE-16677 Implement broadcast method of IgniteCompute interface
cb12d27 is described below
commit cb12d27aa84d0ecfe83e79ddaa1ec6e865e98e9b
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Mar 22 11:33:06 2022 +0400
IGNITE-16677 Implement broadcast method of IgniteCompute interface
---
.../org/apache/ignite/compute/IgniteCompute.java | 29 ++++++++-
.../ignite/internal/compute/IgniteComputeImpl.java | 53 +++++++++++-----
.../matchers/CompletableFutureMatcher.java | 13 ++++
.../ignite/internal/compute/ItComputeTest.java | 71 ++++++++++++++++++++++
4 files changed, 149 insertions(+), 17 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 59ebdc9..ca856dd 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -17,6 +17,7 @@
package org.apache.ignite.compute;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.network.ClusterNode;
@@ -29,7 +30,7 @@ import org.apache.ignite.network.ClusterNode;
*/
public interface IgniteCompute {
/**
- * Executes a {@link ComputeJob}.
+ * Executes a {@link ComputeJob} represented by the given class on one node from the nodes set.
*
* @param nodes nodes on which to execute the job
* @param jobClass class of the job to execute
@@ -40,13 +41,35 @@ public interface IgniteCompute {
<R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
/**
- * Executes a {@link ComputeJob}.
+ * Executes a {@link ComputeJob} represented by the given class on one node from the nodes set.
*
- * @param nodes nodes on which to execute the job
+ * @param nodes candidate nodes; the job will be executed on one of them
* @param jobClassName name of the job class to execute
* @param args arguments of the job
* @param <R> job result type
* @return future job result
*/
<R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args);
+
+ /**
+ * Executes a {@link ComputeJob} represented by the given class on all nodes from the given nodes set.
+ *
+ * @param nodes nodes on each of which the job will be executed
+ * @param jobClass class of the job to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job results
+ */
+ <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a {@link ComputeJob} represented by the given class on all nodes from the given nodes set.
+ *
+ * @param nodes nodes on each of which the job will be executed
+ * @param jobClassName name of the job class to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job results
+ */
+ <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args);
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9a5b43a..9feb6c7 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.compute;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
@@ -43,8 +46,27 @@ public class IgniteComputeImpl implements IgniteCompute {
/** {@inheritDoc} */
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
- ClusterNode targetNode = randomNode(nodes);
+ return executeOnOneNode(randomNode(nodes), jobClass, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ return executeOnOneNode(randomNode(nodes), jobClassName, args);
+ }
+
+ private ClusterNode randomNode(Set<ClusterNode> nodes) {
+ int nodesToSkip = random.nextInt(nodes.size());
+
+ Iterator<ClusterNode> iterator = nodes.iterator();
+ for (int i = 0; i < nodesToSkip; i++) {
+ iterator.next();
+ }
+
+ return iterator.next();
+ }
+ private <R> CompletableFuture<R> executeOnOneNode(ClusterNode targetNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
if (isLocal(targetNode)) {
return computeComponent.executeLocally(jobClass, args);
} else {
@@ -52,11 +74,7 @@ public class IgniteComputeImpl implements IgniteCompute {
}
}
- /** {@inheritDoc} */
- @Override
- public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
- ClusterNode targetNode = randomNode(nodes);
-
+ private <R> CompletableFuture<R> executeOnOneNode(ClusterNode targetNode, String jobClassName, Object[] args) {
if (isLocal(targetNode)) {
return computeComponent.executeLocally(jobClassName, args);
} else {
@@ -68,14 +86,21 @@ public class IgniteComputeImpl implements IgniteCompute {
return targetNode.equals(topologyService.localMember());
}
- private ClusterNode randomNode(Set<ClusterNode> nodes) {
- int nodesToSkip = random.nextInt(nodes.size());
-
- Iterator<ClusterNode> iterator = nodes.iterator();
- for (int i = 0; i < nodesToSkip; i++) {
- iterator.next();
- }
+ /** {@inheritDoc} */
+ @Override
+ public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(
+ Set<ClusterNode> nodes,
+ Class<? extends ComputeJob<R>> jobClass,
+ Object... args
+ ) {
+ return nodes.stream()
+ .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClass, args)));
+ }
- return iterator.next();
+ /** {@inheritDoc} */
+ @Override
+ public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+ return nodes.stream()
+ .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClassName, args)));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index e32fe0a..c8c7916 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.testframework.matchers;
+import static org.hamcrest.Matchers.equalTo;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -76,4 +78,15 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
public static <T> CompletableFutureMatcher<T> willBe(Matcher<T> matcher) {
return new CompletableFutureMatcher<>(matcher);
}
+
+ /**
+ * Returns a Matcher matching the {@link CompletableFuture} under match if it completes successfully with the given value.
+ *
+ * @param value expected value
+ * @param <T> value type
+ * @return matcher
+ */
+ public static <T> CompletableFutureMatcher<T> willBe(T value) {
+ return willBe(equalTo(value));
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index 21be9fb..03b54f3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -18,14 +18,20 @@
package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
@@ -33,6 +39,7 @@ import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.AbstractClusterIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.Test;
/**
@@ -136,6 +143,70 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
assertThat(ex.getCause().getCause(), is(notNullValue()));
}
+ @Test
+ void broadcastsJobWithArguments() {
+ IgniteImpl entryNode = node(0);
+
+ Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+ .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), ConcatJob.class, "a", 42);
+
+ assertThat(results, is(aMapWithSize(3)));
+ for (int i = 0; i < 3; i++) {
+ ClusterNode node = node(i).node();
+ assertThat(results.get(node), willBe("a42"));
+ }
+ }
+
+ @Test
+ void broadcastsJobByClassName() {
+ IgniteImpl entryNode = node(0);
+
+ Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+ .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), ConcatJob.class.getName(), "a", 42);
+
+ assertThat(results, is(aMapWithSize(3)));
+ for (int i = 0; i < 3; i++) {
+ ClusterNode node = node(i).node();
+ assertThat(results.get(node), willBe("a42"));
+ }
+ }
+
+ @Test
+ void broadcastExecutesJobOnRespectiveNodes() {
+ IgniteImpl entryNode = node(0);
+
+ Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+ .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), GetNodeNameJob.class);
+
+ assertThat(results, is(aMapWithSize(3)));
+ for (int i = 0; i < 3; i++) {
+ ClusterNode node = node(i).node();
+ assertThat(results.get(node), willBe(equalTo(node.name())));
+ }
+ }
+
+ @Test
+ void broadcastsFailingJob() throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+ .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), FailingJob.class);
+
+ assertThat(results, is(aMapWithSize(3)));
+ for (int i = 0; i < 3; i++) {
+ Object result = results.get(node(i).node())
+ .handle((res, ex) -> ex != null ? ex : res)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(result, is(instanceOf(CompletionException.class)));
+ assertThat(((CompletionException) result).getCause(), is(instanceOf(JobException.class)));
+
+ JobException ex = (JobException) ((CompletionException) result).getCause();
+ assertThat(ex.getMessage(), is("Oops"));
+ assertThat(ex.getCause(), is(notNullValue()));
+ }
+ }
+
private static class ConcatJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override