You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/29 14:02:35 UTC
[11/50] [abbrv] ignite git commit: IGNITE-4475: New async API: now
all async methods are defined explicitly,
IgniteAsyncSupport is deprecated. This closes #1648.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java
index 485e811..231fc9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest;
@@ -199,6 +201,30 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testExecuteTaskClassAsync() throws Exception {
+ runTest(jobFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ // Begin with negative to check 'null' value in the test.
+ final int[] i = {-1};
+
+ ComputeTaskFuture<List<Object>> fut = ignite.compute().executeAsync(
+ TestTask.class,
+ new T2<>((Factory<ComputeJobAdapter>)factory,
+ (Factory<Object>)new Factory<Object>() {
+ @Override public Object create() {
+ return value(i[0]++);
+ }
+ }));
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testExecuteTask() throws Exception {
runTest(jobFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -222,6 +248,29 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testExecuteTaskAsync() throws Exception {
+ runTest(jobFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ // Begin with negative to check 'null' value in the test.
+ final int[] i = {-1};
+
+ ComputeTaskFuture<List<Object>> fut = ignite.compute().executeAsync(new TestTask(),
+ new T2<>((Factory<ComputeJobAdapter>)factory,
+ (Factory<Object>)new Factory<Object>() {
+ @Override public Object create() {
+ return value(i[0]++);
+ }
+ }));
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testBroadcastClosure() throws Exception {
runTest(closureFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -245,6 +294,29 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testBroadcastClosureAsync() throws Exception {
+ runTest(closureFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ final Collection<Object> resultsAllNull = ignite.compute()
+ .broadcast((IgniteClosure<Object, Object>)factory.create(), null);
+
+ assertEquals("Result's size mismatch: job must be run on all server nodes",
+ gridCount() - clientsCount(), resultsAllNull.size());
+
+ for (Object o : resultsAllNull)
+ assertNull("All results must be null", o);
+
+ IgniteFuture<Collection<Object>> fut = ignite.compute()
+ .broadcastAsync((IgniteClosure<Object, Object>)factory.create(), value(0));
+
+ checkResultsClassCount(gridCount() - clientsCount(), fut.get(), value(0).getClass());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testBroadcastCallable() throws Exception {
runTest(callableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -274,6 +346,35 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testBroadcastCallableAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ EchoCallable job = (EchoCallable)factory.create();
+ job.setArg(null);
+
+ final IgniteFuture<Collection<Object>> futAllNull = ignite.compute()
+ .broadcastAsync(job);
+
+ assertEquals("Result's size mismatch: job must be run on all server nodes",
+ gridCount() - clientsCount(), futAllNull.get().size());
+
+ for (Object o : futAllNull.get())
+ assertNull("All results must be null", o);
+
+ job.setArg(value(0));
+ IgniteFuture<Collection<Object>> futNotNull = ignite.compute()
+ .broadcastAsync(job);
+
+ checkResultsClassCount(gridCount() - clientsCount(), futNotNull.get(), value(0).getClass());
+ for (Object o : futNotNull.get())
+ assertEquals("Invalid broadcast results", value(0), o);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testBroadcastRunnable() throws Exception {
runTest(runnableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -288,6 +389,22 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testBroadcastRunnableAsync() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ IgniteFuture<Void> fut = ignite.compute().broadcastAsync(job);
+
+ fut.get();
+ // All checks are inside the run() method of the job.
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRun() throws Exception {
runTest(runnableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -310,24 +427,48 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testRunAsync() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ IgniteFuture<Void> fut0 = ignite.compute().runAsync(job);
+
+ fut0.get();
+ // All checks are inside the run() method of the job.
+
+ Collection<IgniteRunnable> jobs = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i)
+ jobs.add((IgniteRunnable)factory.create());
+
+ IgniteFuture<Void> fut1 = ignite.compute().runAsync(jobs);
+
+ fut1.get();
+ // All checks are inside the run() method of the job.
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testApplyAsync() throws Exception {
runTest(closureFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
- final IgniteCompute comp = ignite.compute().withAsync();
+ final IgniteCompute comp = ignite.compute();
- Collection<ComputeTaskFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT);
+ Collection<IgniteFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT);
for (int i = 0; i < MAX_JOB_COUNT; ++i) {
// value(i - 1): use negative argument of the value method to generate nullong value.
- comp.apply((IgniteClosure<Object, Object>)factory.create(), value(i - 1));
-
- futures.add(comp.future());
+ futures.add(comp.applyAsync((IgniteClosure<Object, Object>)factory.create(), value(i - 1)));
}
// Wait for results.
Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
- for (ComputeTaskFuture<Object> future : futures)
+ for (IgniteFuture<Object> future : futures)
results.add(future.get());
checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
@@ -339,7 +480,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
- public void testApplySync() throws Exception {
+ public void testApply() throws Exception {
runTest(closureFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
@@ -383,6 +524,32 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testApplyForCollectionAsync() throws Exception {
+ runTest(closureFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ Collection params = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ // value(i - 1): use negative argument of the value method to generate nullong value.
+ params.add(value(i - 1));
+ }
+
+ IgniteClosure c = (IgniteClosure)factory.create();
+
+ // Use type casting to avoid ambiguous for apply(Callable, Object) vs apply(Callable, Collection<Object>).
+ IgniteFuture<Collection<Object>> fut = ignite.compute().applyAsync(
+ (IgniteClosure<TestObject, Object>)c,
+ (Collection<TestObject>)params);
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testApplyForCollectionWithReducer() throws Exception {
runTest(closureFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -409,6 +576,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
return true;
}
});
+
assertTrue(res);
}
});
@@ -417,24 +585,58 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testApplyForCollectionWithReducerAsync() throws Exception {
+ runTest(closureFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ Collection<Object> params = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ // value(i - 1): use negative argument of the value method to generate nullong value.
+ params.add(value(i - 1));
+ }
+
+ IgniteFuture<Boolean> fut = ignite.compute()
+ .applyAsync((IgniteClosure<Object, Object>)factory.create(), params, new IgniteReducer<Object, Boolean>() {
+
+ private Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ @Override public boolean collect(@Nullable Object o) {
+ results.add(o);
+ return true;
+ }
+
+ @Override public Boolean reduce() {
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ return true;
+ }
+ });
+
+ assertTrue(fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCallAsync() throws Exception {
runTest(callableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
- final IgniteCompute comp = ignite.compute().withAsync();
+ final IgniteCompute comp = ignite.compute();
- Collection<ComputeTaskFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT);
+ Collection<IgniteFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT);
for (int i = 0; i < MAX_JOB_COUNT; ++i) {
EchoCallable job = (EchoCallable)factory.create();
job.setArg(value(i - 1));
- comp.call(job);
- futures.add(comp.future());
+ futures.add(comp.callAsync(job));
}
// Wait for results.
Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
- for (ComputeTaskFuture<Object> future : futures)
+ for (IgniteFuture<Object> future : futures)
results.add(future.get());
checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
@@ -446,7 +648,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
- public void testCallSync() throws Exception {
+ public void testCall() throws Exception {
runTest(callableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
@@ -488,6 +690,28 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testCallCollectionAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ Collection<EchoCallable> jobs = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+ job.setArg(value(i - 1));
+ jobs.add(job);
+ }
+
+ IgniteFuture<Collection<Object>> fut = ignite.compute().callAsync(jobs);
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCallCollectionWithReducer() throws Exception {
runTest(callableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
@@ -522,7 +746,41 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
- public void testDummyAffinityCall() throws Exception {
+ public void testCallCollectionWithReducerAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ Collection<EchoCallable> jobs = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+ job.setArg(value(i - 1));
+ jobs.add(job);
+ }
+
+ IgniteFuture<Boolean> fut = ignite.compute().callAsync(jobs, new IgniteReducer<Object, Boolean>() {
+ private Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ @Override public boolean collect(@Nullable Object o) {
+ results.add(o);
+ return true;
+ }
+
+ @Override public Boolean reduce() {
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ return true;
+ }
+ });
+
+ assertTrue(fut.get());
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCall() throws Exception {
runTest(callableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
ignite.getOrCreateCache(CACHE_NAME);
@@ -548,7 +806,147 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
- public void testDummyAffinityRun() throws Exception {
+ public void testAffinityCallAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache(CACHE_NAME);
+
+ final IgniteCompute comp = ignite.compute();
+
+ Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+
+ job.setArg(value(i - 1));
+
+ IgniteFuture<Object> fut = comp.affinityCallAsync("test", key(0), job);
+
+ results.add(fut.get());
+ }
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheAffinityCall() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+
+ job.setArg(value(i - 1));
+
+ results.add(comp.affinityCall(Arrays.asList("test0", "test1"), key(0), job));
+ }
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheAffinityCallAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+
+ job.setArg(value(i - 1));
+
+ IgniteFuture<Object> fut = comp.affinityCallAsync(Arrays.asList("test0", "test1"), key(0), job);
+
+ results.add(fut.get());
+ }
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheByPartIdAffinityCall() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+
+ job.setArg(value(i - 1));
+
+ results.add(comp.affinityCall(Arrays.asList("test0", "test1"), 0, job));
+ }
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheByPartIdAffinityCallAsync() throws Exception {
+ runTest(callableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT);
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ EchoCallable job = (EchoCallable)factory.create();
+
+ job.setArg(value(i - 1));
+
+ IgniteFuture fut = comp.affinityCallAsync(Arrays.asList("test0", "test1"), 0, job);
+
+ results.add(fut.get());
+ }
+
+ checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass());
+ assertCollectionsEquals("Results value mismatch", createGoldenResults(), results);
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityRun() throws Exception {
runTest(runnableFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
ignite.getOrCreateCache(CACHE_NAME);
@@ -567,6 +965,111 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat
/**
* @throws Exception If failed.
*/
+ public void testAffinityRunAsync() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache(CACHE_NAME);
+
+ final IgniteCompute comp = ignite.compute();
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ IgniteFuture<Void> fut = comp.affinityRunAsync("test", key(0), job);
+
+ fut.get();
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheAffinityRun() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ comp.affinityRun(Arrays.asList("test0", "test1"), key(0), job);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheAffinityRunAsync() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ IgniteFuture<Void> fut = comp.affinityRunAsync(Arrays.asList("test0", "test1"), key(0), job);
+
+ fut.get();
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheByPartIdAffinityRun() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ comp.affinityRun(Arrays.asList("test0", "test1"), 0, job);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiCacheByPartIdAffinityRunAsync() throws Exception {
+ runTest(runnableFactories, new ComputeTest() {
+ @Override public void test(Factory factory, Ignite ignite) throws Exception {
+ ignite.getOrCreateCache("test0");
+ ignite.getOrCreateCache("test1");
+
+ final IgniteCompute comp = ignite.compute();
+
+ for (int i = 0; i < MAX_JOB_COUNT; ++i) {
+ IgniteRunnable job = (IgniteRunnable)factory.create();
+
+ IgniteFuture<Void> fut = comp.affinityRunAsync(Arrays.asList("test0", "test1"), 0, job);
+
+ fut.get();
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testDeployExecuteByName() throws Exception {
runTest(jobFactories, new ComputeTest() {
@Override public void test(Factory factory, Ignite ignite) throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index b50dfb7..1ee4744 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -251,6 +250,175 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testApiAsyncOld() throws Exception {
+ IgniteEvents evtAsync = grid(0).events().withAsync();
+
+ try {
+ evtAsync.stopRemoteListen(null);
+ evtAsync.future().get();
+ }
+ catch (NullPointerException ignored) {
+ // No-op.
+ }
+
+ evtAsync.stopRemoteListen(UUID.randomUUID());
+ evtAsync.future().get();
+
+ UUID consumeId = null;
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ },
+ EVTS_DISCOVERY
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ }
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, Event>() {
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return false;
+ }
+ },
+ new P1<Event>() {
+ @Override public boolean apply(Event e) {
+ return false;
+ }
+ }
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testApiAsync() throws Exception {
+ IgniteEvents evt = grid(0).events();
+
+ try {
+ evt.stopRemoteListenAsync(null).get();
+ }
+ catch (NullPointerException ignored) {
+ // No-op.
+ }
+
+ evt.stopRemoteListenAsync(UUID.randomUUID()).get();
+
+ UUID consumeId = null;
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ },
+ EVTS_DISCOVERY
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ }
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, Event>() {
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return false;
+ }
+ },
+ new P1<Event>() {
+ @Override public boolean apply(Event e) {
+ return false;
+ }
+ }
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAllEvents() throws Exception {
final Collection<UUID> nodeIds = new HashSet<>();
final AtomicInteger cnt = new AtomicInteger();
@@ -990,15 +1158,13 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
int idx = rnd.nextInt(GRID_CNT);
try {
- IgniteEvents evts = grid(idx).events().withAsync();
+ IgniteEvents evts = grid(idx).events();
- evts.remoteListen(new P2<UUID, Event>() {
+ UUID consumeId = evts.remoteListenAsync(new P2<UUID, Event>() {
@Override public boolean apply(UUID uuid, Event evt) {
return true;
}
- }, null, EVT_JOB_STARTED);
-
- UUID consumeId = evts.<UUID>future().get(3000);
+ }, null, EVT_JOB_STARTED).get(3000);
started.add(consumeId);
@@ -1029,11 +1195,9 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
UUID consumeId = t.get2();
try {
- IgniteEvents evts = grid(idx).events().withAsync();
-
- evts.stopRemoteListen(consumeId);
+ IgniteEvents evts = grid(idx).events();
- evts.future().get(3000);
+ evts.stopRemoteListenAsync(consumeId).get(3000);
stopped.add(consumeId);
}
@@ -1063,11 +1227,7 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
int idx = rnd.nextInt(GRID_CNT);
try {
- IgniteCompute comp = grid(idx).compute().withAsync();
-
- comp.run(F.noop());
-
- comp.future().get(3000);
+ grid(idx).compute().runAsync(F.noop()).get(3000);
}
catch (IgniteException ignored) {
// Ignore all job execution related errors.
@@ -1089,11 +1249,7 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
int idx = t.get1();
UUID consumeId = t.get2();
- IgniteEvents evts = grid(idx).events().withAsync();
-
- evts.stopRemoteListen(consumeId);
-
- evts.future().get(3000);
+ grid(idx).events().stopRemoteListenAsync(consumeId).get(3000);
stopped.add(consumeId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 04c67dc..1217005 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -266,6 +266,13 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> formatAsync() throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
throwUnsupported();
@@ -274,6 +281,14 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws IgniteException {
@@ -283,6 +298,15 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
throwUnsupported();
@@ -291,6 +315,14 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
throws IgniteException {
@@ -300,6 +332,15 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
throwUnsupported();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
index 0d468b4..36c99dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
@@ -168,6 +168,25 @@ public class IgfsTaskSelfTest extends IgfsCommonAbstractTest {
}
/**
+ * Test task.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void testTaskAsync() throws Exception {
+ String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)];
+
+ generateFile(TOTAL_WORDS);
+ Long genLen = igfs.info(FILE).length();
+
+ IgniteBiTuple<Long, Integer> taskRes = igfs.executeAsync(new Task(),
+ new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg).get();
+
+ assert F.eq(genLen, taskRes.getKey());
+ assert F.eq(TOTAL_WORDS, taskRes.getValue());
+ }
+
+ /**
* Generate file with random data and provided argument.
*
* @param wordCnt Word count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index 3e90a52..3e547d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -186,18 +186,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
public void testOrderedMessage() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- orderedMessage(false);
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testOrderedMessageAsync() throws Exception {
- runInAllDataModes(new TestRunnable() {
- @Override public void run() throws Exception {
- orderedMessage(true);
+ orderedMessage();
}
});
}
@@ -211,26 +200,11 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientServerOrderedMessage(false);
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientServerOrderedMessageAsync() throws Exception {
- if (!testsCfg.withClients())
- return;
-
- runInAllDataModes(new TestRunnable() {
- @Override public void run() throws Exception {
- clientServerOrderedMessage(true);
+ clientServerOrderedMessage();
}
});
}
-
/**
* @throws Exception If failed.
*/
@@ -240,21 +214,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- clientClientOrderedMessage(false);
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientClientOrderedMessageAsync() throws Exception {
- if (!testsCfg.withClients())
- return;
-
- runInAllDataModes(new TestRunnable() {
- @Override public void run() throws Exception {
- clientClientOrderedMessage(true);
+ clientClientOrderedMessage();
}
});
}
@@ -268,21 +228,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
- serverClientOrderedMessage(false);
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerClientOrderedMessageAsync() throws Exception {
- if (!testsCfg.withClients())
- return;
-
- runInAllDataModes(new TestRunnable() {
- @Override public void run() throws Exception {
- serverClientOrderedMessage(true);
+ serverClientOrderedMessage();
}
});
}
@@ -451,68 +397,63 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
}
/**
- * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void orderedMessage(boolean async) throws Exception {
+ private void orderedMessage() throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp, async);
+ registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
- * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void clientServerOrderedMessage(boolean async) throws Exception {
+ private void clientServerOrderedMessage() throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forServers();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp, async);
+ registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
- * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void clientClientOrderedMessage(boolean async) throws Exception {
+ private void clientClientOrderedMessage() throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp, async);
+ registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
- * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void serverClientOrderedMessage(boolean async) throws Exception {
+ private void serverClientOrderedMessage() throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert grp.nodes().size() > 0;
- registerListenerAndSendOrderedMessages(ignite, grp, async);
+ registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
* @param ignite Ignite.
* @param grp Cluster group.
- * @param async Async message send flag.
* @throws Exception If fail.
*/
- private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
+ private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -520,12 +461,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());
try {
- for (int i=0; i < messages; i++){
- if (async)
- ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000);
- else
- ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
- }
+ for (int i=0; i < messages; i++)
+ ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
index c6505ba..f9d1632 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
@@ -147,8 +147,8 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testSameConfiguration() throws Exception {
- String name = "dupService";
+ public void testSameConfigurationOld() throws Exception {
+ String name = "dupServiceOld";
IgniteServices svcs1 = randomGrid().services().withAsync();
IgniteServices svcs2 = randomGrid().services().withAsync();
@@ -176,8 +176,33 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testDifferentConfiguration() throws Exception {
- String name = "dupService";
+ public void testSameConfiguration() throws Exception {
+ String name = "dupServiceOld";
+
+ IgniteServices svcs1 = randomGrid().services();
+ IgniteServices svcs2 = randomGrid().services();
+
+ IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService());
+
+ IgniteFuture<?> fut2 = svcs2.deployClusterSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut1.get();
+
+ info("Finished waiting for service future1: " + name);
+
+ // This must succeed without exception because configuration is the same.
+ fut2.get();
+
+ info("Finished waiting for service future2: " + name);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDifferentConfigurationOld() throws Exception {
+ String name = "dupServiceOld";
IgniteServices svcs1 = randomGrid().services().withAsync();
IgniteServices svcs2 = randomGrid().services().withAsync();
@@ -209,6 +234,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDifferentConfiguration() throws Exception {
+ String name = "dupService";
+
+ IgniteServices svcs1 = randomGrid().services();
+ IgniteServices svcs2 = randomGrid().services();
+
+ IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService());
+
+ IgniteFuture<?> fut2 = svcs2.deployNodeSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut1.get();
+
+ info("Finished waiting for service future: " + name);
+
+ try {
+ fut2.get();
+
+ fail("Failed to receive mismatching configuration exception.");
+ }
+ catch (IgniteException e) {
+ info("Received mismatching configuration exception: " + e.getMessage());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testGetServiceByName() throws Exception {
String name = "serviceByName";
@@ -255,10 +309,10 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testDeployOnEachNode() throws Exception {
+ public void testDeployOnEachNodeOld() throws Exception {
Ignite g = randomGrid();
- String name = "serviceOnEachNode";
+ String name = "serviceOnEachNodeOld";
CountDownLatch latch = new CountDownLatch(nodeCount());
@@ -287,10 +341,38 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testDeploySingleton() throws Exception {
+ public void testDeployOnEachNode() throws Exception {
Ignite g = randomGrid();
- String name = "serviceSingleton";
+ String name = "serviceOnEachNode";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount());
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployNodeSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeploySingletonOld() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceSingletonOld";
CountDownLatch latch = new CountDownLatch(1);
@@ -319,7 +401,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testAffinityDeploy() throws Exception {
+ public void testDeploySingleton() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceSingleton";
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployClusterSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityDeployOld() throws Exception {
Ignite g = randomGrid();
final Integer affKey = 1;
@@ -327,7 +437,7 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
// Store a cache key.
g.cache(CACHE_NAME).put(affKey, affKey.toString());
- String name = "serviceAffinity";
+ String name = "serviceAffinityOld";
IgniteServices svcs = g.services().withAsync();
@@ -348,10 +458,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testDeployMultiple1() throws Exception {
+ public void testAffinityDeploy() throws Exception {
Ignite g = randomGrid();
- String name = "serviceMultiple1";
+ final Integer affKey = 1;
+
+ // Store a cache key.
+ g.cache(CACHE_NAME).put(affKey, affKey.toString());
+
+ String name = "serviceAffinity";
+
+ IgniteFuture<?> fut = g.services().deployKeyAffinitySingletonAsync(name, new AffinityService(affKey),
+ CACHE_NAME, affKey);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ checkCount(name, g.services().serviceDescriptors(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployMultiple1Old() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceMultiple1Old";
CountDownLatch latch = new CountDownLatch(nodeCount() * 2);
@@ -380,10 +515,38 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
- public void testDeployMultiple2() throws Exception {
+ public void testDeployMultiple1() throws Exception {
Ignite g = randomGrid();
- String name = "serviceMultiple2";
+ String name = "serviceMultiple1";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount() * 2);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), nodeCount() * 2, 3);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount() * 2, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount() * 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployMultiple2Old() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceMultiple2Old";
int cnt = nodeCount() * 2 + 1;
@@ -414,6 +577,36 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDeployMultiple2() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceMultiple2";
+
+ int cnt = nodeCount() * 2 + 1;
+
+ CountDownLatch latch = new CountDownLatch(cnt);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), cnt, 3);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, cnt, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), cnt);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCancelSingleton() throws Exception {
Ignite g = randomGrid();
@@ -449,6 +642,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testCancelSingletonAsync() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceCancelAsync";
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ g.services().deployClusterSingleton(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ latch = new CountDownLatch(1);
+
+ DummyService.cancelLatch(name, latch);
+
+ g.services().cancelAsync(name).get();
+
+ info("Cancelled service: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 1, DummyService.cancelled(name));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCancelEachNode() throws Exception {
Ignite g = randomGrid();
@@ -482,6 +710,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCancelAsyncEachNode() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceCancelEachNodeAsync";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount());
+
+ DummyService.exeLatch(name, latch);
+
+ g.services().deployNodeSingleton(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ latch = new CountDownLatch(nodeCount());
+
+ DummyService.cancelLatch(name, latch);
+
+ g.services().cancelAsync(name).get();
+
+ info("Cancelled service: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, nodeCount(), DummyService.cancelled(name));
+ }
+
+ /**
* @param svcName Service name.
* @param descs Descriptors.
* @param cnt Expected count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
index 39336ef..9b787a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
@@ -44,11 +44,9 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
DummyService.exeLatch(name, latch);
- IgniteServices svcs = g.services().withAsync();
+ IgniteServices svcs = g.services();
- svcs.deployClusterSingleton(name, new DummyService());
-
- IgniteFuture<?> fut = svcs.future();
+ IgniteFuture<?> fut = svcs.deployClusterSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
@@ -91,13 +89,11 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
String name = "serviceAffinityUpdateTopology";
- IgniteServices svcs = g.services().withAsync();
+ IgniteServices svcs = g.services();
- svcs.deployKeyAffinitySingleton(name, new AffinityService(affKey),
+ IgniteFuture<?> fut = svcs.deployKeyAffinitySingletonAsync(name, new AffinityService(affKey),
CACHE_NAME, affKey);
- IgniteFuture<?> fut = svcs.future();
-
info("Deployed service: " + name);
fut.get();
@@ -130,11 +126,9 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
DummyService.exeLatch(name, latch);
- IgniteServices svcs = g.services().withAsync();
-
- svcs.deployNodeSingleton(name, new DummyService());
+ IgniteServices svcs = g.services();
- IgniteFuture<?> fut = svcs.future();
+ IgniteFuture<?> fut = svcs.deployNodeSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 03b00f4..8eefa20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
@@ -62,14 +63,12 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
@Override public Void call() throws Exception {
IgniteServices svcs = ignite.services();
- IgniteServices services = svcs.withAsync();
-
- services.deployClusterSingleton("myClusterSingletonService", new TestServiceImpl());
+ IgniteFuture f = svcs.deployClusterSingletonAsync("myClusterSingletonService", new TestServiceImpl());
depLatch.countDown();
try {
- services.future().get();
+ f.get();
}
catch (IgniteException ignored) {
finishLatch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java b/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java
index 272c7ad..559cfc9 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.compute.ComputeJob;
@@ -89,9 +88,8 @@ public final class GridSingleExecutionTest {
System.exit(1);
}
else if (args.length >= 2) {
- for (IgniteConfiguration cfg: getConfigurations(args[1], args[0])) {
+ for (IgniteConfiguration cfg: getConfigurations(args[1], args[0]))
G.start(cfg);
- }
}
boolean useSes = false;
@@ -104,12 +102,8 @@ public final class GridSingleExecutionTest {
try {
Ignite ignite = G.ignite();
- IgniteCompute comp = ignite.compute().withAsync();
-
// Execute Hello World task.
- comp.execute(!useSes ? TestTask.class : TestSessionTask.class, null);
-
- ComputeTaskFuture<Object> fut = comp.future();
+ ComputeTaskFuture<Object> fut = ignite.compute().executeAsync(!useSes ? TestTask.class : TestSessionTask.class, null);
if (useSes) {
fut.getTaskSession().setAttribute("attr1", 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
index 6a43fee..bf34545 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Executors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -88,13 +87,11 @@ public class GridTestMain {
long start = System.currentTimeMillis();
- IgniteCompute comp = g.compute().withAsync();
-
// Collocate computations and data.
for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) {
final long key = i;
- comp.affinityRun("partitioned", GridTestKey.affinityKey(key), new IgniteRunnable() {
+ final IgniteFuture<?> f = g.compute().affinityRunAsync("partitioned", GridTestKey.affinityKey(key), new IgniteRunnable() {
// This code will execute on remote nodes by collocating keys with cached data.
@Override public void run() {
Long val = cache.localPeek(new GridTestKey(key), CachePeekMode.ONHEAP);
@@ -104,8 +101,6 @@ public class GridTestMain {
}
});
- final IgniteFuture<?> f = comp.future();
-
q.put(f);
f.listen(new CI1<IgniteFuture<?>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java
index ab6b272..c764f67 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.loadtests.direct.multisplit;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -106,8 +105,6 @@ public class GridMultiSplitsLoadTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(new Runnable() {
/** {@inheritDoc} */
@Override public void run() {
- IgniteCompute comp = ignite.compute().withAsync();
-
while (end - System.currentTimeMillis() > 0) {
int levels = 3;
@@ -116,9 +113,7 @@ public class GridMultiSplitsLoadTest extends GridCommonAbstractTest {
long start = System.currentTimeMillis();
try {
- comp.execute(GridLoadTestTask.class, levels);
-
- ComputeTaskFuture<Integer> fut = comp.future();
+ ComputeTaskFuture<Integer> fut = ignite.compute().executeAsync(GridLoadTestTask.class, levels);
int res = fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java
index 5d909c9..6fb7cdf 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.loadtests.direct.newnodes;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.G;
@@ -141,8 +140,6 @@ public abstract class GridSingleSplitsNewNodesAbstractLoadTest extends GridCommo
GridTestUtils.runMultiThreaded(new Runnable() {
/** {@inheritDoc} */
@Override public void run() {
- IgniteCompute comp = ignite.compute().withAsync();
-
while (end - System.currentTimeMillis() > 0
&& !Thread.currentThread().isInterrupted()) {
long start = System.currentTimeMillis();
@@ -150,9 +147,8 @@ public abstract class GridSingleSplitsNewNodesAbstractLoadTest extends GridCommo
try {
int levels = 3;
- comp.execute(new GridSingleSplitNewNodesTestTask(), levels);
-
- ComputeTaskFuture<Integer> fut = comp.future();
+ ComputeTaskFuture<Integer> fut = ignite.compute().executeAsync(
+ new GridSingleSplitNewNodesTestTask(), levels);
int res = fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java
index a065580..9662882 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java
@@ -114,7 +114,7 @@ public class GridDsiClient implements Callable {
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "InfiniteLoopStatement"})
@Nullable @Override public Object call() throws Exception {
- IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode())).withAsync();
+ IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode()));
while (!finish.get()) {
try {
@@ -122,9 +122,8 @@ public class GridDsiClient implements Callable {
long submitTime1 = t0;
- comp.execute(GridDsiRequestTask.class, new GridDsiMessage(terminalId, nodeId));
-
- ComputeTaskFuture<T3<Long, Integer, Integer>> f1 = comp.future();
+ ComputeTaskFuture<T3<Long, Integer, Integer>> f1 = comp.executeAsync(
+ GridDsiRequestTask.class, new GridDsiMessage(terminalId, nodeId));
submitTime.setIfGreater(System.currentTimeMillis() - submitTime1);
@@ -132,9 +131,8 @@ public class GridDsiClient implements Callable {
submitTime1 = System.currentTimeMillis();
- comp.execute(GridDsiResponseTask.class, new GridDsiMessage(terminalId, nodeId));
-
- ComputeTaskFuture<T3<Long, Integer, Integer>> f2 = comp.future();
+ ComputeTaskFuture<T3<Long, Integer, Integer>> f2 = comp.executeAsync(
+ GridDsiResponseTask.class, new GridDsiMessage(terminalId, nodeId));
submitTime.setIfGreater(System.currentTimeMillis() - submitTime1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java
index 8e55ff9..53c6f50 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java
@@ -29,7 +29,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.util.lang.GridAbsClosure;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.G;
@@ -75,16 +74,10 @@ public class GridJobExecutionLoadTestClientSemaphore implements Callable<Object>
ClusterGroup rmts = g.cluster().forRemotes();
- IgniteCompute comp = g.compute(rmts).withAsync();
-
while (!finish) {
tasksSem.acquire();
- comp.execute(GridJobExecutionLoadTestTask.class, null);
-
- ComputeTaskFuture<Object> f = comp.future();
-
- f.listen(lsnr);
+ g.compute(rmts).executeAsync(GridJobExecutionLoadTestTask.class, null).listen(lsnr);
txCnt.increment();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java
index 2e2ab20..2f94b48 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java
@@ -24,13 +24,11 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -188,13 +186,7 @@ public class GridJobExecutionSingleNodeSemaphoreLoadTest {
@Nullable @Override public Object call() throws Exception {
sem.acquire();
- IgniteCompute comp = g.compute().withAsync();
-
- comp.execute(GridJobExecutionLoadTestTask.class, null);
-
- ComputeTaskFuture<Object> f = comp.future();
-
- f.listen(lsnr);
+ g.compute().executeAsync(GridJobExecutionLoadTestTask.class, null).listen(lsnr);
iterCntr.increment();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java
index 8dcd828..16a6af8 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
@@ -68,8 +67,6 @@ public class GridJobLoadTestSubmitter implements Runnable {
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override public void run() {
- IgniteCompute comp = ignite.compute().withAsync();
-
while (true) {
checkCompletion();
@@ -83,9 +80,7 @@ public class GridJobLoadTestSubmitter implements Runnable {
}
try {
- comp.withTimeout(TIMEOUT).execute(GridJobLoadTestTask.class, params);
-
- futures.add(comp.<Integer>future());
+ futures.add(ignite.compute().withTimeout(TIMEOUT).executeAsync(GridJobLoadTestTask.class, params));
}
catch (IgniteException e) {
// Should not be thrown since uses asynchronous execution.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java
index 8c8f039..cb609ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobContext;
@@ -77,12 +76,8 @@ public class GridMergeSortLoadTask extends ComputeTaskSplitAdapter<int[], int[]>
// Future is null before holdcc() is called and
// not null after callcc() is called.
if (fut == null) {
- IgniteCompute comp = ignite.compute().withAsync();
-
// Launch the recursive child task asynchronously.
- comp.execute(new GridMergeSortLoadTask(), arr);
-
- fut = comp.future();
+ fut = ignite.compute().executeAsync(new GridMergeSortLoadTask(), arr);
// Add a listener to the future, that will resume the
// parent task once the child one is completed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 3f66c5d..b7ddc3e 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -1031,7 +1031,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/**
* @throws Exception If failed.
*/
- public void testAsync() throws Exception {
+ public void testAsyncOld() throws Exception {
final AtomicInteger msgCnt = new AtomicInteger();
TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
@@ -1137,6 +1137,76 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAsync() throws Exception {
+ final AtomicInteger msgCnt = new AtomicInteger();
+
+ TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+
+ discoSpi.blockCustomEvent();
+
+ final String topic = "topic";
+
+ IgniteFuture<UUID> starFut = ignite2.message().remoteListenAsync(topic, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() +
+ " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ msgCnt.incrementAndGet();
+
+ return true;
+ }
+ });
+
+ Assert.assertNotNull(starFut);
+
+ U.sleep(500);
+
+ Assert.assertFalse(starFut.isDone());
+
+ discoSpi.stopBlock();
+
+ UUID id = starFut.get();
+
+ Assert.assertNotNull(id);
+
+ Assert.assertTrue(starFut.isDone());
+
+ discoSpi.blockCustomEvent();
+
+ message(ignite1.cluster().forRemotes()).send(topic, "msg1");
+
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return msgCnt.get() > 0;
+ }
+ }, 5000);
+
+ assertEquals(1, msgCnt.get());
+
+ IgniteFuture<?> stopFut = ignite2.message().stopRemoteListenAsync(id);
+
+ Assert.assertNotNull(stopFut);
+
+ U.sleep(500);
+
+ Assert.assertFalse(stopFut.isDone());
+
+ discoSpi.stopBlock();
+
+ stopFut.get();
+
+ Assert.assertTrue(stopFut.isDone());
+
+ message(ignite1.cluster().forRemotes()).send(topic, "msg2");
+
+ U.sleep(1000);
+
+ assertEquals(1, msgCnt.get());
+ }
+
+ /**
*
*/
static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
@@ -1231,6 +1301,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/**
* @param expOldestIgnite Expected oldest ignite.
+ * @throws InterruptedException If failed.
*/
private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException {
ClusterGroup grp = ignite1.cluster().forOldest();