You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/02 09:05:40 UTC
[2/2] ignite git commit: IGNITE-4580: New asyncs for Compute API.
IGNITE-4580: New asyncs for Compute API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5969cd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5969cd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5969cd3
Branch: refs/heads/ignite-4475-async
Commit: a5969cd3f46f8f814ea73caaa708504cf21e4350
Parents: 739c606
Author: devozerov <vo...@gridgain.com>
Authored: Thu Feb 2 12:05:23 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Feb 2 12:05:23 2017 +0300
----------------------------------------------------------------------
.../computegrid/ComputeAsyncExample.java | 8 +-
.../ComputeFibonacciContinuationExample.java | 13 +-
.../datastructures/IgniteLockExample.java | 4 +-
.../datastructures/IgniteSemaphoreExample.java | 4 +-
.../java/org/apache/ignite/IgniteCompute.java | 262 ++++++++-
.../ignite/internal/IgniteComputeImpl.java | 549 ++++++++++++++++---
.../internal/cluster/ClusterGroupAdapter.java | 2 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 6 +-
.../platform/compute/PlatformCompute.java | 14 +-
...formDotNetEntityFrameworkCacheExtension.java | 7 +-
.../visor/cache/VisorCacheClearTask.java | 6 +-
.../apache/ignite/lang/IgniteAsyncSupport.java | 52 +-
.../ignite/lang/IgniteAsyncSupported.java | 4 +-
...gniteComputeConfigVariationsFullApiTest.java | 533 +++++++++++++++++-
14 files changed, 1317 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java
index e8321a5..8064ace 100644
--- a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeAsyncExample.java
@@ -49,21 +49,19 @@ public class ComputeAsyncExample {
System.out.println("Compute asynchronous example started.");
// Enable asynchronous mode.
- IgniteCompute compute = ignite.compute().withAsync();
+ IgniteCompute compute = ignite.compute();
Collection<IgniteFuture<?>> futs = new ArrayList<>();
// Iterate through all words in the sentence and create runnable jobs.
for (final String word : "Print words using runnable".split(" ")) {
// Execute runnable on some node.
- compute.run(new IgniteRunnable() {
+ futs.add(compute.runAsync(new IgniteRunnable() {
@Override public void run() {
System.out.println();
System.out.println(">>> Printing '" + word + "' on this node from ignite job.");
}
- });
-
- futs.add(compute.future());
+ }));
}
// Wait for completion of all futures.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java
index 6642e9d..0fe12f1 100644
--- a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeFibonacciContinuationExample.java
@@ -27,7 +27,6 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
@@ -142,13 +141,12 @@ public final class ComputeFibonacciContinuationExample {
ClusterGroup p = ignite.cluster().forPredicate(nodeFilter);
- IgniteCompute compute = ignite.compute(p).withAsync();
+ IgniteCompute compute = ignite.compute(p);
// If future is not cached in node-local-map, cache it.
if (fut1 == null) {
- compute.apply(new ContinuationFibonacciClosure(nodeFilter), n - 1);
-
- ComputeTaskFuture<BigInteger> futVal = compute.future();
+ IgniteFuture<BigInteger> futVal = compute.applyAsync(
+ new ContinuationFibonacciClosure(nodeFilter), n - 1);
fut1 = locMap.putIfAbsent(n - 1, futVal);
@@ -158,9 +156,8 @@ public final class ComputeFibonacciContinuationExample {
// If future is not cached in node-local-map, cache it.
if (fut2 == null) {
- compute.apply(new ContinuationFibonacciClosure(nodeFilter), n - 2);
-
- ComputeTaskFuture<BigInteger> futVal = compute.<BigInteger>future();
+ IgniteFuture<BigInteger> futVal = compute.applyAsync(
+ new ContinuationFibonacciClosure(nodeFilter), n - 2);
fut2 = locMap.putIfAbsent(n - 2, futVal);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
index 1f84787..ba035ae 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
@@ -87,11 +87,11 @@ public class IgniteLockExample {
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
- ignite.compute().withAsync().run(new Consumer(reentrantLockName));
+ ignite.compute().runAsync(new Consumer(reentrantLockName));
// Start producers on all cluster nodes.
for (int i = 0; i < NUM_PRODUCERS; i++)
- ignite.compute().withAsync().run(new Producer(reentrantLockName));
+ ignite.compute().runAsync(new Producer(reentrantLockName));
System.out.println("Master node is waiting for all other nodes to finish...");
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 1c078b0..12d1eab 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -67,11 +67,11 @@ public class IgniteSemaphoreExample {
// Start consumers on all cluster nodes.
for (int i = 0; i < NUM_CONSUMERS; i++)
- ignite.compute().withAsync().run(new Consumer(semaphoreName));
+ ignite.compute().runAsync(new Consumer(semaphoreName));
// Start producers on all cluster nodes.
for (int i = 0; i < NUM_PRODUCERS; i++)
- ignite.compute().withAsync().run(new Producer(semaphoreName));
+ ignite.compute().runAsync(new Producer(semaphoreName));
System.out.println("Master node is waiting for all other nodes to finish...");
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index 212849a..947089c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -28,6 +28,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
@@ -113,6 +114,7 @@ import org.jetbrains.annotations.Nullable;
* checkpoints, etc.). If you need to override configured defaults, you should use compute task together with
* {@link ComputeTaskSpis} annotation. Refer to {@link ComputeTask} documentation for more information.
*/
+@SuppressWarnings("deprecation")
public interface IgniteCompute extends IgniteAsyncSupport {
/**
* Gets cluster group to which this {@code IgniteCompute} instance belongs.
@@ -135,6 +137,20 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public void affinityRun(@Nullable String cacheName, Object affKey, IgniteRunnable job) throws IgniteException;
/**
+ * Executes given job asynchronously on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed.
+ *
+ * @param cacheName Name of the cache to use for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity run.
+ * @throws IgniteException If job failed.
+ */
+ public IgniteFuture<Void> affinityRunAsync(@Nullable String cacheName, Object affKey, IgniteRunnable job)
+ throws IgniteException;
+
+ /**
* Executes given job on the node where data for provided affinity key is located
* (a.k.a. affinity co-location). The data of the partition where affKey is stored
* will not be migrated from the target node while the job is executed. The data
@@ -150,6 +166,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Executes given job asynchronously on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed. The data
+ * of the extra caches' partitions with the same partition number also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity run.
+ * @throws IgniteException If job failed.
+ */
+ public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteRunnable job) throws IgniteException;
+
+ /**
* Executes given job on the node where partition is located (the partition is primary on the node)
* The data of the partition will not be migrated from the target node
* while the job is executed. The data of the extra caches' partitions with the same partition number
@@ -165,6 +196,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Executes given job asynchronously on the node where partition is located (the partition is primary on the node)
+ * The data of the partition will not be migrated from the target node
+ * while the job is executed. The data of the extra caches' partitions with the same partition number
+ * also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param partId Partition number.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity run.
+ * @throws IgniteException If job failed.
+ */
+ public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, int partId,
+ IgniteRunnable job) throws IgniteException;
+
+ /**
* Executes given job on the node where data for provided affinity key is located
* (a.k.a. affinity co-location). The data of the partition where affKey is stored
* will not be migrated from the target node while the job is executed.
@@ -179,6 +225,20 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) throws IgniteException;
/**
+ * Executes given job asynchronously on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed.
+ *
+ * @param cacheName Name of the cache to use for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity call.
+ * @throws IgniteException If job failed.
+ */
+ public <R> IgniteFuture<R> affinityCallAsync(@Nullable String cacheName, Object affKey, IgniteCallable<R> job)
+ throws IgniteException;
+
+ /**
* Executes given job on the node where data for provided affinity key is located
* (a.k.a. affinity co-location). The data of the partition where affKey is stored
* will not be migrated from the target node while the job is executed. The data
@@ -195,6 +255,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Executes given job asynchronously on the node where data for provided affinity key is located
+ * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+ * will not be migrated from the target node while the job is executed. The data
+ * of the extra caches' partitions with the same partition number also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param affKey Affinity key.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity call.
+ * @throws IgniteException If job failed.
+ */
+ public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteCallable<R> job) throws IgniteException;
+
+ /**
* Executes given job on the node where partition is located (the partition is primary on the node)
* The data of the partition will not be migrated from the target node
* while the job is executed. The data of the extra caches' partitions with the same partition number
@@ -211,6 +286,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Executes given job asynchronously on the node where partition is located (the partition is primary on the node)
+ * The data of the partition will not be migrated from the target node
+ * while the job is executed. The data of the extra caches' partitions with the same partition number
+ * also will not be migrated.
+ *
+ * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+ * @param partId Partition to reserve.
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @return a Future representing pending completion of the affinity call.
+ * @throws IgniteException If job failed.
+ */
+ public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, int partId,
+ IgniteCallable<R> job) throws IgniteException;
+
+ /**
* Executes given task on within the cluster group. For step-by-step explanation of task execution process
* refer to {@link ComputeTask} documentation.
*
@@ -225,6 +315,20 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <T, R> R execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) throws IgniteException;
/**
+ * Executes given task asynchronously on within the cluster group. For step-by-step explanation of task execution
+ * process refer to {@link ComputeTask} documentation.
+ *
+ * @param taskCls Class of the task to execute. If class has {@link ComputeTaskName} annotation,
+ * then task is deployed under a name specified within annotation. Otherwise, full
+ * class name is used as task name.
+ * @param arg Optional argument of task execution, can be {@code null}.
+ * @return a Future representing pending completion of the task.
+ * @throws IgniteException If task failed.
+ */
+ public <T, R> ComputeTaskFuture<R> executeAsync(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg)
+ throws IgniteException;
+
+ /**
* Executes given task within the cluster group. For step-by-step explanation of task execution process
* refer to {@link ComputeTask} documentation.
*
@@ -239,6 +343,19 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) throws IgniteException;
/**
+ * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution
+ * process refer to {@link ComputeTask} documentation.
+ *
+ * @param task Instance of task to execute. If task class has {@link ComputeTaskName} annotation,
+ * then task is deployed under a name specified within annotation. Otherwise, full
+ * class name is used as task name.
+ * @param arg Optional argument of task execution, can be {@code null}.
+ * @return a Future representing pending completion of the task.
+ * @throws IgniteException If task failed.
+ */
+ public <T, R> ComputeTaskFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg) throws IgniteException;
+
+ /**
* Executes given task within the cluster group. For step-by-step explanation of task execution process
* refer to {@link ComputeTask} documentation.
* <p>
@@ -255,6 +372,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <T, R> R execute(String taskName, @Nullable T arg) throws IgniteException;
/**
+ * Executes given task asynchronously within the cluster group. For step-by-step explanation of task execution
+ * process refer to {@link ComputeTask} documentation.
+ * <p>
+ * If task for given name has not been deployed yet, then {@code taskName} will be
+ * used as task class name to auto-deploy the task (see {@link #localDeployTask(Class, ClassLoader)} method).
+ *
+ * @param taskName Name of the task to execute.
+ * @param arg Optional argument of task execution, can be {@code null}.
+ * @return a Future representing pending completion of the task.
+ * @throws IgniteException If task failed.
+ * @see ComputeTask for information about task execution.
+ */
+ public <T, R> ComputeTaskFuture<R> executeAsync(String taskName, @Nullable T arg) throws IgniteException;
+
+ /**
* Broadcasts given job to all nodes in the cluster group.
*
* @param job Job to broadcast to all cluster group nodes.
@@ -264,6 +396,15 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public void broadcast(IgniteRunnable job) throws IgniteException;
/**
+ * Broadcasts given job asynchronously to all nodes in the cluster group.
+ *
+ * @param job Job to broadcast to all cluster group nodes.
+ * @return a Future representing pending completion of the broadcast execution of the job.
+ * @throws IgniteException If job failed.
+ */
+ public IgniteFuture<Void> broadcastAsync(IgniteRunnable job) throws IgniteException;
+
+ /**
* Broadcasts given job to all nodes in cluster group. Every participating node will return a
* job result. Collection of all returned job results is returned from the result future.
*
@@ -275,6 +416,16 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R> Collection<R> broadcast(IgniteCallable<R> job) throws IgniteException;
/**
+ * Broadcasts given job asynchronously to all nodes in cluster group. Every participating node will return a
+ * job result. Collection of all returned job results is returned from the result future.
+ *
+ * @param job Job to broadcast to all cluster group nodes.
+ * @return a Future representing pending completion of the broadcast execution of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <R> IgniteFuture<Collection<R>> broadcastAsync(IgniteCallable<R> job) throws IgniteException;
+
+ /**
* Broadcasts given closure job with passed in argument to all nodes in the cluster group.
* Every participating node will return a job result. Collection of all returned job results
* is returned from the result future.
@@ -288,6 +439,19 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException;
/**
+ * Broadcasts given closure job asynchronously with passed in argument to all nodes in the cluster group.
+ * Every participating node will return a job result. Collection of all returned job results
+ * is returned from the result future.
+ *
+ * @param job Job to broadcast to all cluster group nodes.
+ * @param arg Job closure argument.
+ * @return a Future representing pending completion of the broadcast execution of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <R, T> IgniteFuture<Collection<R>> broadcastAsync(IgniteClosure<T, R> job, @Nullable T arg)
+ throws IgniteException;
+
+ /**
* Executes provided job on a node within the underlying cluster group.
*
* @param job Job closure to execute.
@@ -297,6 +461,15 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public void run(IgniteRunnable job) throws IgniteException;
/**
+ * Executes provided job asynchronously on a node within the underlying cluster group.
+ *
+ * @param job Job closure to execute.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public IgniteFuture<Void> runAsync(IgniteRunnable job) throws IgniteException;
+
+ /**
* Executes collection of jobs on grid nodes within the underlying cluster group.
*
* @param jobs Collection of jobs to execute.
@@ -306,6 +479,16 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public void run(Collection<? extends IgniteRunnable> jobs) throws IgniteException;
/**
+ * Executes collection of jobs asynchronously on grid nodes within the underlying cluster group.
+ * Executes asynchronously. Returns control immediately.
+ *
+ * @param jobs Collection of jobs to execute.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public IgniteFuture<Void> runAsync(Collection<? extends IgniteRunnable> jobs) throws IgniteException;
+
+ /**
* Executes provided job on a node within the underlying cluster group. The result of the
* job execution is returned from the result closure.
*
@@ -317,6 +500,16 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R> R call(IgniteCallable<R> job) throws IgniteException;
/**
+ * Executes provided job asynchronously on a node within the underlying cluster group. The result of the
+ * job execution is returned from the result closure.
+ *
+ * @param job Job to execute.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job) throws IgniteException;
+
+ /**
* Executes collection of jobs on nodes within the underlying cluster group.
* Collection of all returned job results is returned from the result future.
*
@@ -328,12 +521,23 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R> Collection<R> call(Collection<? extends IgniteCallable<R>> jobs) throws IgniteException;
/**
+ * Executes collection of jobs asynchronously on nodes within the underlying cluster group.
+ * Collection of all returned job results is returned from the result future.
+ *
+ * @param jobs Collection of jobs to execute.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <R> IgniteFuture<Collection<R>> callAsync(Collection<? extends IgniteCallable<R>> jobs)
+ throws IgniteException;
+
+ /**
* Executes collection of jobs on nodes within the underlying cluster group. The returned
* job results will be reduced into an individual result by provided reducer.
*
* @param jobs Collection of jobs to execute.
* @param rdc Reducer to reduce all job results into one individual return value.
- * @return Future with reduced job result for this execution.
+ * @return Reduced job result for this execution.
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
@@ -341,6 +545,18 @@ public interface IgniteCompute extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Executes collection of jobs asynchronously on nodes within the underlying cluster group. The returned
+ * job results will be reduced into an individual result by provided reducer.
+ *
+ * @param jobs Collection of jobs to execute.
+ * @param rdc Reducer to reduce all job results into one individual return value.
+ * @return a Future with reduced job result for this execution.
+ * @throws IgniteException If execution failed.
+ */
+ public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends IgniteCallable<R1>> jobs,
+ IgniteReducer<R1, R2> rdc) throws IgniteException;
+
+ /**
* Executes provided closure job on a node within the underlying cluster group. This method is different
* from {@code run(...)} and {@code call(...)} methods in a way that it receives job argument
* which is then passed into the closure at execution time.
@@ -354,6 +570,18 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException;
/**
+ * Executes provided closure job asynchronously on a node within the underlying cluster group.
+ * This method is different from {@code run(...)} and {@code call(...)} methods in a way that
+ * it receives job argument which is then passed into the closure at execution time.
+ *
+ * @param job Job to run.
+ * @param arg Job argument.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R> job, @Nullable T arg) throws IgniteException;
+
+ /**
* Executes provided closure job on nodes within the underlying cluster group. A new job is executed for
* every argument in the passed in collection. The number of actual job executions will be
* equal to size of the job arguments collection.
@@ -367,6 +595,19 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public <T, R> Collection<R> apply(IgniteClosure<T, R> job, Collection<? extends T> args) throws IgniteException;
/**
+ * Executes provided closure job asynchronously on nodes within the underlying cluster group. A new job is executed
+ * for every argument in the passed in collection. The number of actual job executions will be
+ * equal to size of the job arguments collection.
+ *
+ * @param job Job to run.
+ * @param args Job arguments.
+ * @return a Future representing pending completion of the job.
+ * @throws IgniteException If execution failed.
+ */
+ public <T, R> IgniteFuture<Collection<R>> applyAsync(IgniteClosure<T, R> job, Collection<? extends T> args)
+ throws IgniteException;
+
+ /**
* Executes provided closure job on nodes within the underlying cluster group. A new job is executed for
* every argument in the passed in collection. The number of actual job executions will be
* equal to size of the job arguments collection. The returned job results will be reduced
@@ -375,7 +616,7 @@ public interface IgniteCompute extends IgniteAsyncSupport {
* @param job Job to run.
* @param args Job arguments.
* @param rdc Reducer to reduce all job results into one individual return value.
- * @return Future with reduced job result for this execution.
+ * @return Reduced job result for this execution.
* @throws IgniteException If execution failed.
*/
@IgniteAsyncSupported
@@ -383,6 +624,21 @@ public interface IgniteCompute extends IgniteAsyncSupport {
IgniteReducer<R1, R2> rdc) throws IgniteException;
/**
+ * Executes provided closure job asynchronously on nodes within the underlying cluster group. A new job is executed
+ * for every argument in the passed in collection. The number of actual job executions will be
+ * equal to size of the job arguments collection. The returned job results will be reduced
+ * into an individual result by provided reducer.
+ *
+ * @param job Job to run.
+ * @param args Job arguments.
+ * @param rdc Reducer to reduce all job results into one individual return value.
+ * @return a Future with reduced job result for this execution.
+ * @throws IgniteException If execution failed.
+ */
+ public <R1, R2, T> IgniteFuture<R2> applyAsync(IgniteClosure<T, R1> job, Collection<? extends T> args,
+ IgniteReducer<R1, R2> rdc) throws IgniteException;
+
+ /**
* Gets tasks future for active tasks started on local node.
*
* @return Map of active tasks keyed by their task task session ID.
@@ -481,8 +737,10 @@ public interface IgniteCompute extends IgniteAsyncSupport {
public void undeployTask(String taskName) throws IgniteException;
/** {@inheritDoc} */
+ @Deprecated
@Override public <R> ComputeTaskFuture<R> future();
/** {@inheritDoc} */
+ @Deprecated
@Override public IgniteCompute withAsync();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 3900c1f..0fa8ae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteDeploymentException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
@@ -83,9 +84,18 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
* @param ctx Kernal context.
* @param prj Projection.
* @param subjId Subject ID.
+ */
+ public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId) {
+ this(ctx, prj, subjId, false);
+ }
+
+ /**
+ * @param ctx Kernal context.
+ * @param prj Projection.
+ * @param subjId Subject ID.
* @param async Async support flag.
*/
- public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async) {
+ private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async) {
super(async);
this.ctx = ctx;
@@ -105,6 +115,29 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void affinityRun(@Nullable String cacheName, Object affKey, IgniteRunnable job) {
+ try {
+ saveOrGet(affinityRunAsync0(cacheName, affKey, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> affinityRunAsync(@Nullable String cacheName, Object affKey,
+ IgniteRunnable job) throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheName, affKey, job));
+ }
+
+ /**
+ * Affinity run implementation.
+ *
+ * @param cacheName Cache name.
+ * @param affKey Affinity key.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<?> affinityRunAsync0(@Nullable String cacheName, Object affKey, IgniteRunnable job) {
A.notNull(affKey, "affKey");
A.notNull(job, "job");
@@ -119,8 +152,8 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey,
- job, prj.nodes()));
+ return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey,
+ job, prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -132,6 +165,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) {
+ try {
+ saveOrGet(affinityRunAsync0(cacheNames, affKey, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteRunnable job) throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, affKey, job));
+ }
+
+ /**
+ * Affinity run implementation.
+ *
+ * @param cacheNames Cache names collection.
+ * @param affKey Affinity key.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<?> affinityRunAsync0(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteRunnable job) {
A.notNull(affKey, "affKey");
A.notNull(job, "job");
A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
@@ -149,7 +206,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- saveOrGet(ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes()));
+ return ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -161,6 +218,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) {
+ try {
+ saveOrGet(affinityRunAsync0(cacheNames, partId, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> affinityRunAsync(@NotNull Collection<String> cacheNames, int partId,
+ IgniteRunnable job) throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, partId, job));
+ }
+
+ /**
+ * Affinity run implementation.
+ *
+ * @param cacheNames Cache names collection.
+ * @param partId partition ID.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<?> affinityRunAsync0(@NotNull Collection<String> cacheNames, int partId,
+ IgniteRunnable job) {
A.ensure(partId >= 0, "partId = " + partId);
A.notNull(job, "job");
A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
@@ -168,7 +249,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- saveOrGet(ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes()));
+ return ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -180,6 +261,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) {
+ try {
+ return saveOrGet(affinityCallAsync0(cacheName, affKey, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> affinityCallAsync(@Nullable String cacheName, Object affKey,
+ IgniteCallable<R> job) throws IgniteException {
+ return createFuture(affinityCallAsync0(cacheName, affKey, job));
+ }
+
+ /**
+ * Affinity call implementation.
+
+ * @param cacheName Cache name.
+ * @param affKey Affinity key.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<R> affinityCallAsync0(@Nullable String cacheName, Object affKey,
+ IgniteCallable<R> job) {
A.notNull(affKey, "affKey");
A.notNull(job, "job");
@@ -194,8 +299,8 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job,
- prj.nodes()));
+ return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job,
+ prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -207,8 +312,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) {
+ try {
+ return saveOrGet(affinityCallAsync0(cacheNames, affKey, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteCallable<R> job) throws IgniteException {
+ return createFuture(affinityCallAsync0(cacheNames, affKey, job));
+ }
+ /**
+ * Affinity call implementation.
+
+ * @param cacheNames Cache names collection.
+ * @param affKey Affinity key.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull Collection<String> cacheNames, Object affKey,
+ IgniteCallable<R> job) {
A.notNull(affKey, "affKey");
A.notNull(job, "job");
A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
@@ -226,7 +353,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes()));
+ return ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -238,6 +365,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) {
+ try {
+ return saveOrGet(affinityCallAsync0(cacheNames, partId, job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull Collection<String> cacheNames, int partId,
+ IgniteCallable<R> job) throws IgniteException {
+ return createFuture(affinityCallAsync0(cacheNames, partId, job));
+ }
+
+ /**
+ * Affinity call implementation.
+
+ * @param cacheNames Cache names collection.
+ * @param partId Partition ID.
+ * @param job Job.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull Collection<String> cacheNames, int partId,
+ IgniteCallable<R> job) {
A.ensure(partId >= 0, "partId = " + partId);
A.notNull(job, "job");
A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
@@ -245,7 +396,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes()));
+ return ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -258,6 +409,28 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T, R> R execute(String taskName, @Nullable T arg) {
+ try {
+ return (R)saveOrGet(executeAsync0(taskName, arg));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> ComputeTaskFuture<R> executeAsync(String taskName, @Nullable T arg) throws IgniteException {
+ return (ComputeTaskFuture<R>)createFuture(executeAsync0(taskName, arg));
+ }
+
+ /**
+ * Execute implementation.
+ *
+ * @param taskName Task name.
+ * @param arg Argument.
+ * @return Internal future.
+ */
+ @SuppressWarnings("unchecked")
+ private <T, R> IgniteInternalFuture<R> executeAsync0(String taskName, @Nullable T arg) {
A.notNull(taskName, "taskName");
guard();
@@ -266,10 +439,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
- return (R)saveOrGet(ctx.task().execute(taskName, arg));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.task().execute(taskName, arg);
}
finally {
unguard();
@@ -278,6 +448,29 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) {
+ try {
+ return (R)saveOrGet(executeAsync0(taskCls, arg));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> ComputeTaskFuture<R> executeAsync(Class<? extends ComputeTask<T, R>> taskCls,
+ @Nullable T arg) throws IgniteException {
+ return (ComputeTaskFuture<R>)createFuture(executeAsync0(taskCls, arg));
+ }
+
+ /**
+ * Execute implementation.
+ *
+ * @param taskCls Task class.
+ * @param arg Argument.
+ * @return Internal future.
+ */
+ @SuppressWarnings("unchecked")
+ private <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) {
A.notNull(taskCls, "taskCls");
guard();
@@ -286,10 +479,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
- return saveOrGet(ctx.task().execute(taskCls, arg));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.task().execute(taskCls, arg);
}
finally {
unguard();
@@ -298,30 +488,28 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) {
- A.notNull(task, "task");
-
- guard();
-
try {
- ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
- ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
-
- return saveOrGet(ctx.task().execute(task, arg));
+ return (R)saveOrGet(executeAsync0(task, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
- finally {
- unguard();
- }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> ComputeTaskFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg)
+ throws IgniteException {
+ return (ComputeTaskFuture<R>)createFuture(executeAsync0(task, arg));
}
/**
+ * Execute implementation.
+ *
* @param task Task.
* @param arg Task argument.
* @return Task future.
*/
- public <T, R> ComputeTaskInternalFuture<R> executeAsync(ComputeTask<T, R> task, @Nullable T arg) {
+ public <T, R> ComputeTaskInternalFuture<R> executeAsync0(ComputeTask<T, R> task, @Nullable T arg) {
A.notNull(task, "task");
guard();
@@ -337,21 +525,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
}
}
+ /** {@inheritDoc} */
+ @Override public void broadcast(IgniteRunnable job) {
+ try {
+ saveOrGet(broadcastAsync0(job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> broadcastAsync(IgniteRunnable job) throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(broadcastAsync0(job));
+ }
+
/**
- * @param taskName Task name.
- * @param arg Task argument.
- * @return Task future.
+ * Broadcast implementation.
+ *
+ * @param job Job.
+ * @return Internal future.
*/
- public <T, R> ComputeTaskInternalFuture<R> executeAsync(String taskName, @Nullable T arg) {
- A.notNull(taskName, "taskName");
+ private IgniteInternalFuture<?> broadcastAsync0(IgniteRunnable job) {
+ A.notNull(job, "job");
guard();
try {
- ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
- ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
-
- return ctx.task().execute(taskName, arg);
+ return ctx.closure().runAsync(BROADCAST, job, prj.nodes());
}
finally {
unguard();
@@ -359,33 +560,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
}
/** {@inheritDoc} */
- @Override public void broadcast(IgniteRunnable job) {
- A.notNull(job, "job");
-
- guard();
-
+ @Override public <R> Collection<R> broadcast(IgniteCallable<R> job) {
try {
- saveOrGet(ctx.closure().runAsync(BROADCAST, job, prj.nodes()));
+ return saveOrGet(broadcastAsync0(job));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
- finally {
- unguard();
- }
}
/** {@inheritDoc} */
- @Override public <R> Collection<R> broadcast(IgniteCallable<R> job) {
+ @Override public <R> IgniteFuture<Collection<R>> broadcastAsync(IgniteCallable<R> job) throws IgniteException {
+ return createFuture(broadcastAsync0(job));
+ }
+
+ /**
+ * Broadcast implementation.
+ *
+ * @param job Job.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<Collection<R>> broadcastAsync0(IgniteCallable<R> job) {
A.notNull(job, "job");
guard();
try {
- return saveOrGet(ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes());
}
finally {
unguard();
@@ -394,15 +595,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R, T> Collection<R> broadcast(IgniteClosure<T, R> job, @Nullable T arg) {
+ try {
+ return saveOrGet(broadcastAsync0(job, arg));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R, T> IgniteFuture<Collection<R>> broadcastAsync(IgniteClosure<T, R> job,
+ @Nullable T arg) throws IgniteException {
+ return createFuture(broadcastAsync0(job, arg));
+ }
+
+ /**
+ * Broadcast implementation.
+ *
+ * @param job Job.
+ * @param arg Argument.
+ * @return Internal future.
+ */
+ private <R, T> IgniteInternalFuture<Collection<R>> broadcastAsync0(IgniteClosure<T, R> job, @Nullable T arg) {
A.notNull(job, "job");
guard();
try {
- return saveOrGet(ctx.closure().broadcast(job, arg, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().broadcast(job, arg, prj.nodes());
}
finally {
unguard();
@@ -411,15 +631,32 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void run(IgniteRunnable job) {
+ try {
+ saveOrGet(runAsync0(job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> runAsync(IgniteRunnable job) throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(runAsync0(job));
+ }
+
+ /**
+ * Run implementation.
+ *
+ * @param job Job.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<?> runAsync0(IgniteRunnable job) {
A.notNull(job, "job");
guard();
try {
- saveOrGet(ctx.closure().runAsync(BALANCE, job, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().runAsync(BALANCE, job, prj.nodes());
}
finally {
unguard();
@@ -428,15 +665,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void run(Collection<? extends IgniteRunnable> jobs) {
+ try {
+ saveOrGet(runAsync0(jobs));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> runAsync(Collection<? extends IgniteRunnable> jobs)
+ throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(runAsync0(jobs));
+ }
+
+ /**
+ * Run implementation.
+ *
+ * @param jobs Jobs.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<?> runAsync0(Collection<? extends IgniteRunnable> jobs) {
A.notEmpty(jobs, "jobs");
guard();
try {
- saveOrGet(ctx.closure().runAsync(BALANCE, jobs, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().runAsync(BALANCE, jobs, prj.nodes());
}
finally {
unguard();
@@ -445,15 +700,34 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R, T> R apply(IgniteClosure<T, R> job, @Nullable T arg) {
+ try {
+ return saveOrGet(applyAsync0(job, arg));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R> job, @Nullable T arg)
+ throws IgniteException {
+ return (IgniteFuture<R>)createFuture(applyAsync0(job, arg));
+ }
+
+ /**
+ * Apply implementation.
+ *
+ * @param job Job.
+ * @param arg Argument.
+ * @return Internal future.
+ */
+ private <R, T> IgniteInternalFuture<R> applyAsync0(IgniteClosure<T, R> job, @Nullable T arg) {
A.notNull(job, "job");
guard();
try {
- return saveOrGet(ctx.closure().callAsync(job, arg, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(job, arg, prj.nodes());
}
finally {
unguard();
@@ -462,15 +736,32 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> R call(IgniteCallable<R> job) {
+ try {
+ return saveOrGet(callAsync0(job));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job) throws IgniteException {
+ return (IgniteFuture<R>)createFuture(callAsync0(job));
+ }
+
+ /**
+ * Call implementation.
+ *
+ * @param job Job.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<R> callAsync0(IgniteCallable<R> job) {
A.notNull(job, "job");
guard();
try {
- return saveOrGet(ctx.closure().callAsync(BALANCE, job, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(BALANCE, job, prj.nodes());
}
finally {
unguard();
@@ -479,15 +770,33 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> Collection<R> call(Collection<? extends IgniteCallable<R>> jobs) {
+ try {
+ return saveOrGet(callAsync0(jobs));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<Collection<R>> callAsync(
+ Collection<? extends IgniteCallable<R>> jobs) throws IgniteException {
+ return (IgniteFuture<Collection<R>>)createFuture(callAsync0(jobs));
+ }
+
+ /**
+ * Call implementation.
+ *
+ * @param jobs Jobs.
+ * @return Internal future.
+ */
+ private <R> IgniteInternalFuture<Collection<R>> callAsync0(Collection<? extends IgniteCallable<R>> jobs) {
A.notEmpty(jobs, "jobs");
guard();
try {
- return saveOrGet(ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes());
}
finally {
unguard();
@@ -496,16 +805,36 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> Collection<R> apply(final IgniteClosure<T, R> job, @Nullable Collection<? extends T> args) {
+ try {
+ return saveOrGet(applyAsync0(job, args));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<Collection<R>> applyAsync(IgniteClosure<T, R> job,
+ Collection<? extends T> args) throws IgniteException {
+ return (IgniteFuture<Collection<R>>)createFuture(applyAsync0(job, args));
+ }
+
+ /**
+ * Apply implementation.
+ *
+ * @param job Job.
+ * @param args Arguments/
+ * @return Internal future.
+ */
+ private <T, R> IgniteInternalFuture<Collection<R>> applyAsync0(final IgniteClosure<T, R> job,
+ @Nullable Collection<? extends T> args) {
A.notNull(job, "job");
A.notNull(args, "args");
guard();
try {
- return saveOrGet(ctx.closure().callAsync(job, args, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(job, args, prj.nodes());
}
finally {
unguard();
@@ -514,16 +843,36 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R1, R2> R2 call(Collection<? extends IgniteCallable<R1>> jobs, IgniteReducer<R1, R2> rdc) {
+ try {
+ return saveOrGet(callAsync0(jobs, rdc));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends IgniteCallable<R1>> jobs,
+ IgniteReducer<R1, R2> rdc) throws IgniteException {
+ return (IgniteFuture<R2>)createFuture(callAsync0(jobs, rdc));
+ }
+
+ /**
+ * Call with reducer implementation.
+ *
+ * @param jobs Jobs.
+ * @param rdc Reducer.
+ * @return Internal future.
+ */
+ private <R1, R2> IgniteInternalFuture<R2> callAsync0(Collection<? extends IgniteCallable<R1>> jobs,
+ IgniteReducer<R1, R2> rdc) {
A.notEmpty(jobs, "jobs");
A.notNull(rdc, "rdc");
guard();
try {
- return saveOrGet(ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes());
}
finally {
unguard();
@@ -533,6 +882,30 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R1, R2, T> R2 apply(IgniteClosure<T, R1> job, Collection<? extends T> args,
IgniteReducer<R1, R2> rdc) {
+ try {
+ return saveOrGet(applyAsync0(job, args, rdc));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R1, R2, T> IgniteFuture<R2> applyAsync(IgniteClosure<T, R1> job,
+ Collection<? extends T> args, IgniteReducer<R1, R2> rdc) throws IgniteException {
+ return createFuture(applyAsync0(job, args, rdc));
+ }
+
+ /**
+ * Apply with reducer implementation.
+ *
+ * @param job Job
+ * @param args Arguments.
+ * @param rdc Reducer.
+ * @return Internal future.
+ */
+ private <R1, R2, T> IgniteInternalFuture<R2> applyAsync0(IgniteClosure<T, R1> job, Collection<? extends T> args,
+ IgniteReducer<R1, R2> rdc) {
A.notNull(job, "job");
A.notNull(rdc, "rdc");
A.notNull(args, "args");
@@ -540,10 +913,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return saveOrGet(ctx.closure().callAsync(job, args, rdc, prj.nodes()));
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ return ctx.closure().callAsync(job, args, rdc, prj.nodes());
}
finally {
unguard();
@@ -647,7 +1017,8 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- ctx.deploy().undeployTask(taskName, prj.node(ctx.localNodeId()) != null, prj.forRemotes().nodes());
+ ctx.deploy().undeployTask(taskName, prj.node(ctx.localNodeId()) != null,
+ prj.forRemotes().nodes());
}
finally {
unguard();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index dec9b7d..2c1f035 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -212,7 +212,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
if (compute == null) {
assert ctx != null;
- compute = new IgniteComputeImpl(ctx, this, subjId, false);
+ compute = new IgniteComputeImpl(ctx, this, subjId);
}
return compute;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 5c4a147..3128c31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -575,11 +575,7 @@ public class JdbcConnection implements Connection {
throw new SQLException("Failed to establish connection with node (is it a server node?): " +
nodeId);
- IgniteCompute compute = ignite.compute(grp).withAsync();
-
- compute.call(task);
-
- return compute.<Boolean>future().get(timeout, SECONDS);
+ return ignite.compute(grp).callAsync(task).get(timeout, SECONDS);
}
else
return task.call();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 9d9a4d2..2b2a78a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -240,9 +240,10 @@ public class PlatformCompute extends PlatformAbstractTarget {
* Execute task.
*
* @param task Task.
+ * @return Target.
*/
private PlatformTarget executeNative0(final PlatformAbstractTask task) {
- IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
+ IgniteInternalFuture fut = computeForPlatform.executeAsync0(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
private static final long serialVersionUID = 0L;
@@ -266,7 +267,9 @@ public class PlatformCompute extends PlatformAbstractTarget {
* Execute task taking arguments from the given reader.
*
* @param reader Reader.
+ * @param async Execute asynchronously flag.
* @return Task result.
+ * @throws IgniteCheckedException On error.
*/
protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) throws IgniteCheckedException {
String taskName = reader.readString();
@@ -277,18 +280,13 @@ public class PlatformCompute extends PlatformAbstractTarget {
IgniteCompute compute0 = computeForTask(nodeIds);
- if (async)
- compute0 = compute0.withAsync();
-
if (!keepBinary && arg instanceof BinaryObjectImpl)
arg = ((BinaryObject)arg).deserialize();
- Object res = compute0.execute(taskName, arg);
-
if (async)
- return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.future()));
+ return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.executeAsync(taskName, arg)));
else
- return toBinary(res);
+ return toBinary(compute0.execute(taskName, arg));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
index d4755de..eb675fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
@@ -187,11 +187,10 @@ public class PlatformDotNetEntityFrameworkCacheExtension implements PlatformCach
final ClusterGroup dataNodes = grid.cluster().forDataNodes(dataCacheName);
- IgniteCompute asyncCompute = grid.compute(dataNodes).withAsync();
+ IgniteFuture f = grid.compute(dataNodes).broadcastAsync(
+ new RemoveOldEntriesRunnable(dataCacheName, currentVersions));
- asyncCompute.broadcast(new RemoveOldEntriesRunnable(dataCacheName, currentVersions));
-
- asyncCompute.future().listen(new CleanupCompletionListener(metaCache, dataCacheName));
+ f.listen(new CleanupCompletionListener(metaCache, dataCacheName));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 1f1a6fb..faac65b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -95,11 +95,9 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
* @return {@code true} If subJob was not completed and this job should be suspended.
*/
private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
- IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
+ IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName));
- compute.call(subJob);
-
- IgniteFuture<Integer> fut = compute.future();
+ IgniteFuture<Integer> fut = compute.callAsync(subJob);
futs[idx] = fut;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
index 50a8700..3e31b51 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
@@ -18,25 +18,75 @@
package org.apache.ignite.lang;
/**
- * Allows to enable asynchronous mode on Ignite APIs.
+ * Allows to enable asynchronous mode on Ignite APIs, e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
+ * @deprecated since 2.0. Please use specialized asynchronous methods.
*/
+@Deprecated
public interface IgniteAsyncSupport {
/**
* Gets instance of this component with asynchronous mode enabled.
*
* @return Instance of this component with asynchronous mode enabled.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public IgniteAsyncSupport withAsync();
/**
* @return {@code True} if asynchronous mode is enabled.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public boolean isAsync();
/**
* Gets and resets future for previous asynchronous operation.
*
* @return Future for previous asynchronous operation.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public <R> IgniteFuture<R> future();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a5969cd3/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
index 1bb7162..2dfea51 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
@@ -31,11 +31,13 @@ import java.lang.annotation.Target;
*
* TODO coding example.
*
+ * @deprecated since 2.0. Please use specialized asynchronous methods.
* @see IgniteAsyncSupport
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
+@Deprecated
public @interface IgniteAsyncSupported {
-
+ // No-op.
}
\ No newline at end of file