You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/22 12:16:25 UTC

[ignite-3] branch main updated: IGNITE-16677 Implement broadcast method of IgniteCompute interface

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cb12d27  IGNITE-16677 Implement broadcast method of IgniteCompute interface
cb12d27 is described below

commit cb12d27aa84d0ecfe83e79ddaa1ec6e865e98e9b
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Mar 22 11:33:06 2022 +0400

    IGNITE-16677 Implement broadcast method of IgniteCompute interface
---
 .../org/apache/ignite/compute/IgniteCompute.java   | 29 ++++++++-
 .../ignite/internal/compute/IgniteComputeImpl.java | 53 +++++++++++-----
 .../matchers/CompletableFutureMatcher.java         | 13 ++++
 .../ignite/internal/compute/ItComputeTest.java     | 71 ++++++++++++++++++++++
 4 files changed, 149 insertions(+), 17 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 59ebdc9..ca856dd 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.compute;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.network.ClusterNode;
@@ -29,7 +30,7 @@ import org.apache.ignite.network.ClusterNode;
  */
 public interface IgniteCompute {
     /**
-     * Executes a {@link ComputeJob}.
+     * Executes a {@link ComputeJob} represented by the given class on one node from the nodes set.
      *
      * @param nodes    nodes on which to execute the job
      * @param jobClass class of the job to execute
@@ -40,13 +41,35 @@ public interface IgniteCompute {
     <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
 
     /**
-     * Executes a {@link ComputeJob}.
+     * Executes a {@link ComputeJob} represented by the given class on one node from the nodes set.
      *
-     * @param nodes    nodes on which to execute the job
+     * @param nodes    candidate nodes; the job will be executed on one of them
      * @param jobClassName name of the job class to execute
      * @param args     arguments of the job
      * @param <R>      job result type
      * @return future job result
      */
     <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args);
+
+    /**
+     * Executes a {@link ComputeJob} represented by the given class on all nodes from the given nodes set.
+     *
+     * @param nodes nodes on each of which the job will be executed
+     * @param jobClass class of the job to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job results
+     */
+    <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a {@link ComputeJob} represented by the given class on all nodes from the given nodes set.
+     *
+     * @param nodes nodes on each of which the job will be executed
+     * @param jobClassName name of the job class to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job results
+     */
+    <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args);
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9a5b43a..9feb6c7 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
@@ -43,8 +46,27 @@ public class IgniteComputeImpl implements IgniteCompute {
     /** {@inheritDoc} */
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass, Object... args) {
-        ClusterNode targetNode = randomNode(nodes);
+        return executeOnOneNode(randomNode(nodes), jobClass, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        return executeOnOneNode(randomNode(nodes), jobClassName, args);
+    }
+
+    private ClusterNode randomNode(Set<ClusterNode> nodes) {
+        int nodesToSkip = random.nextInt(nodes.size());
+
+        Iterator<ClusterNode> iterator = nodes.iterator();
+        for (int i = 0; i < nodesToSkip; i++) {
+            iterator.next();
+        }
+
+        return iterator.next();
+    }
 
+    private <R> CompletableFuture<R> executeOnOneNode(ClusterNode targetNode, Class<? extends ComputeJob<R>> jobClass, Object[] args) {
         if (isLocal(targetNode)) {
             return computeComponent.executeLocally(jobClass, args);
         } else {
@@ -52,11 +74,7 @@ public class IgniteComputeImpl implements IgniteCompute {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String jobClassName, Object... args) {
-        ClusterNode targetNode = randomNode(nodes);
-
+    private <R> CompletableFuture<R> executeOnOneNode(ClusterNode targetNode, String jobClassName, Object[] args) {
         if (isLocal(targetNode)) {
             return computeComponent.executeLocally(jobClassName, args);
         } else {
@@ -68,14 +86,21 @@ public class IgniteComputeImpl implements IgniteCompute {
         return targetNode.equals(topologyService.localMember());
     }
 
-    private ClusterNode randomNode(Set<ClusterNode> nodes) {
-        int nodesToSkip = random.nextInt(nodes.size());
-
-        Iterator<ClusterNode> iterator = nodes.iterator();
-        for (int i = 0; i < nodesToSkip; i++) {
-            iterator.next();
-        }
+    /** {@inheritDoc} */
+    @Override
+    public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(
+            Set<ClusterNode> nodes,
+            Class<? extends ComputeJob<R>> jobClass,
+            Object... args
+    ) {
+        return nodes.stream()
+                .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClass, args)));
+    }
 
-        return iterator.next();
+    /** {@inheritDoc} */
+    @Override
+    public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String jobClassName, Object... args) {
+        return nodes.stream()
+                .collect(toUnmodifiableMap(node -> node, node -> executeOnOneNode(node, jobClassName, args)));
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index e32fe0a..c8c7916 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.testframework.matchers;
 
+import static org.hamcrest.Matchers.equalTo;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -76,4 +78,15 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
     public static <T> CompletableFutureMatcher<T> willBe(Matcher<T> matcher) {
         return new CompletableFutureMatcher<>(matcher);
     }
+
+    /**
+     * Returns a Matcher matching the {@link CompletableFuture} under match if it completes successfully with the given value.
+     *
+     * @param value expected value
+     * @param <T> value type
+     * @return matcher
+     */
+    public static <T> CompletableFutureMatcher<T> willBe(T value) {
+        return willBe(equalTo(value));
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index 21be9fb..03b54f3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -18,14 +18,20 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
@@ -33,6 +39,7 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.internal.AbstractClusterIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -136,6 +143,70 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
         assertThat(ex.getCause().getCause(), is(notNullValue()));
     }
 
+    @Test
+    void broadcastsJobWithArguments() {
+        IgniteImpl entryNode = node(0);
+
+        Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+                .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), ConcatJob.class, "a", 42);
+
+        assertThat(results, is(aMapWithSize(3)));
+        for (int i = 0; i < 3; i++) {
+            ClusterNode node = node(i).node();
+            assertThat(results.get(node), willBe("a42"));
+        }
+    }
+
+    @Test
+    void broadcastsJobByClassName() {
+        IgniteImpl entryNode = node(0);
+
+        Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+                .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), ConcatJob.class.getName(), "a", 42);
+
+        assertThat(results, is(aMapWithSize(3)));
+        for (int i = 0; i < 3; i++) {
+            ClusterNode node = node(i).node();
+            assertThat(results.get(node), willBe("a42"));
+        }
+    }
+
+    @Test
+    void broadcastExecutesJobOnRespectiveNodes() {
+        IgniteImpl entryNode = node(0);
+
+        Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+                .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), GetNodeNameJob.class);
+
+        assertThat(results, is(aMapWithSize(3)));
+        for (int i = 0; i < 3; i++) {
+            ClusterNode node = node(i).node();
+            assertThat(results.get(node), willBe(equalTo(node.name())));
+        }
+    }
+
+    @Test
+    void broadcastsFailingJob() throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        Map<ClusterNode, CompletableFuture<String>> results = entryNode.compute()
+                .broadcast(Set.of(entryNode.node(), node(1).node(), node(2).node()), FailingJob.class);
+
+        assertThat(results, is(aMapWithSize(3)));
+        for (int i = 0; i < 3; i++) {
+            Object result = results.get(node(i).node())
+                    .handle((res, ex) -> ex != null ? ex : res)
+                    .get(1, TimeUnit.SECONDS);
+
+            assertThat(result, is(instanceOf(CompletionException.class)));
+            assertThat(((CompletionException) result).getCause(), is(instanceOf(JobException.class)));
+
+            JobException ex = (JobException) ((CompletionException) result).getCause();
+            assertThat(ex.getMessage(), is("Oops"));
+            assertThat(ex.getCause(), is(notNullValue()));
+        }
+    }
+
     private static class ConcatJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override