You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/08 12:42:19 UTC
[1/2] ignite git commit: IGNITE-4583: new async API at the
IgniteCluster, IgniteEvents, IgniteMessaging, IgniteServices, Transaction.
Repository: ignite
Updated Branches:
refs/heads/ignite-4475-async dd4d43908 -> fde1f486d
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 37727f5..827bc5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.services;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -32,8 +31,8 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.internal.processors.service.GridServiceProxy;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
@@ -46,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
/**
* Interop services.
@@ -107,9 +107,6 @@ public class PlatformServices extends PlatformAbstractTarget {
/** */
private final IgniteServices services;
- /** */
- private final IgniteServices servicesAsync;
-
/** Server keep binary flag. */
private final boolean srvKeepBinary;
@@ -126,7 +123,6 @@ public class PlatformServices extends PlatformAbstractTarget {
assert services != null;
this.services = services;
- servicesAsync = services.withAsync();
this.srvKeepBinary = srvKeepBinary;
}
@@ -155,21 +151,21 @@ public class PlatformServices extends PlatformAbstractTarget {
}
case OP_DOTNET_DEPLOY_ASYNC: {
- dotnetDeploy(reader, servicesAsync);
+ readAndListenFuture(reader, dotnetDeployAsync(reader, services));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_DOTNET_DEPLOY_MULTIPLE: {
- dotnetDeployMultiple(reader, services);
+ dotnetDeployMultiple(reader);
return TRUE;
}
case OP_DOTNET_DEPLOY_MULTIPLE_ASYNC: {
- dotnetDeployMultiple(reader, servicesAsync);
+ readAndListenFuture(reader, dotnetDeployMultipleAsync(reader));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CANCEL: {
@@ -179,15 +175,15 @@ public class PlatformServices extends PlatformAbstractTarget {
}
case OP_CANCEL_ASYNC: {
- servicesAsync.cancel(reader.readString());
+ readAndListenFuture(reader, services.cancelAsync(reader.readString()));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CANCEL_ALL_ASYNC: {
- servicesAsync.cancelAll();
+ readAndListenFuture(reader, services.cancelAllAsync());
- return readAndListenFuture(reader);
+ return TRUE;
}
default:
@@ -350,15 +346,12 @@ public class PlatformServices extends PlatformAbstractTarget {
return super.processInStreamOutObject(type, reader);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)servicesAsync.future()).internalFuture();
- }
-
/**
* Deploys multiple dotnet services.
+ *
+ * @param reader Binary reader.
*/
- private void dotnetDeployMultiple(BinaryRawReaderEx reader, IgniteServices services) {
+ private void dotnetDeployMultiple(BinaryRawReaderEx reader) {
String name = reader.readString();
Object svc = reader.readObjectDetached();
int totalCnt = reader.readInt();
@@ -369,9 +362,53 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/**
+ * Asynchronously deploys multiple dotnet services.
+ *
+ * @param reader Binary reader.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<Void> dotnetDeployMultipleAsync(BinaryRawReaderEx reader) {
+ String name = reader.readString();
+ Object svc = reader.readObjectDetached();
+ int totalCnt = reader.readInt();
+ int maxPerNodeCnt = reader.readInt();
+
+ return services.deployMultipleAsync(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary),
+ totalCnt, maxPerNodeCnt);
+ }
+
+ /**
* Deploys dotnet service.
+ *
+ * @param reader Binary reader.
+ * @param services Services.
*/
private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) {
+ ServiceConfiguration cfg = dotnetConfiguration(reader);
+
+ services.deploy(cfg);
+ }
+
+ /**
+ * Deploys dotnet service asynchronously.
+ *
+ * @param reader Binary reader.
+ * @param services Services.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<Void> dotnetDeployAsync(BinaryRawReaderEx reader, IgniteServices services) {
+ ServiceConfiguration cfg = dotnetConfiguration(reader);
+
+ return services.deployAsync(cfg);
+ }
+
+ /**
+ * Read the dotnet service configuration.
+ *
+ * @param reader Binary reader,
+ * @return Service configuration.
+ */
+ @NotNull private ServiceConfiguration dotnetConfiguration(BinaryRawReaderEx reader) {
ServiceConfiguration cfg = new ServiceConfiguration();
cfg.setName(reader.readString());
@@ -386,7 +423,7 @@ public class PlatformServices extends PlatformAbstractTarget {
if (filter != null)
cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter));
- services.deploy(cfg);
+ return cfg;
}
/**
@@ -403,8 +440,8 @@ public class PlatformServices extends PlatformAbstractTarget {
/** */
private static final Map<Class<?>, Class<?>> PRIMITIVES_TO_WRAPPERS = new HashMap<>();
- /**
- * Class initializer.
+ /*
+ Class initializer.
*/
static {
PRIMITIVES_TO_WRAPPERS.put(boolean.class, Boolean.class);
@@ -422,6 +459,7 @@ public class PlatformServices extends PlatformAbstractTarget {
*
* @param proxy Proxy object.
* @param clazz Proxy class.
+ * @param ctx Platform context.
*/
private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) {
super(ctx);
@@ -435,18 +473,18 @@ public class PlatformServices extends PlatformAbstractTarget {
/**
* Invokes the proxy.
+ *
* @param mthdName Method name.
* @param srvKeepBinary Binary flag.
* @param args Args.
* @return Invocation result.
- * @throws IgniteCheckedException
- * @throws NoSuchMethodException
+ * @throws IgniteCheckedException On error.
+ * @throws NoSuchMethodException On error.
*/
public Object invoke(String mthdName, boolean srvKeepBinary, Object[] args)
throws IgniteCheckedException, NoSuchMethodException {
- if (proxy instanceof PlatformService) {
+ if (proxy instanceof PlatformService)
return ((PlatformService)proxy).invokeMethod(mthdName, srvKeepBinary, args);
- }
else {
assert proxy instanceof GridServiceProxy;
@@ -467,6 +505,7 @@ public class PlatformServices extends PlatformAbstractTarget {
* @param mthdName Name.
* @param args Args.
* @return Method.
+ * @throws NoSuchMethodException On error.
*/
private static Method getMethod(Class clazz, String mthdName, Object[] args) throws NoSuchMethodException {
assert clazz != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 21f71fa..8f34343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -196,17 +196,16 @@ public class PlatformTransactions extends PlatformAbstractTarget {
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
long txId = reader.readLong();
- final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+ IgniteFuture fut0;
switch (type) {
case OP_COMMIT_ASYNC:
- asyncTx.commit();
+ fut0 = tx(txId).commitAsync();
break;
-
case OP_ROLLBACK_ASYNC:
- asyncTx.rollback();
+ fut0 = tx(txId).rollbackAsync();
break;
@@ -215,7 +214,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
// Future result is the tx itself, we do not want to return it to the platform.
- IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
+ IgniteFuture fut = fut0.chain(new C1<IgniteFuture, Object>() {
private static final long serialVersionUID = 0L;
@Override public Object apply(IgniteFuture fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index e2e7100..57a2b00 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -237,6 +238,19 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
public void commit() throws IgniteException;
/**
+ * Asynchronously commits this transaction by initiating {@code two-phase-commit} process.
+ *
+ * @return a Future representing pending completion of the commit.
+ * @throws IgniteException If commit failed.
+ * @throws TransactionTimeoutException If transaction is timed out.
+ * @throws TransactionRollbackException If transaction is automatically rolled back.
+ * @throws TransactionOptimisticException If transaction concurrency is {@link TransactionConcurrency#OPTIMISTIC}
+ * and commit is optimistically failed.
+ * @throws TransactionHeuristicException If transaction has entered an unknown state.
+ */
+ public IgniteFuture<Void> commitAsync() throws IgniteException;
+
+ /**
* Ends the transaction. Transaction will be rolled back if it has not been committed.
*
* @throws IgniteException If transaction could not be gracefully ended.
@@ -250,4 +264,12 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
*/
@IgniteAsyncSupported
public void rollback() throws IgniteException;
+
+ /**
+ * Asynchronously rolls back this transaction.
+ *
+ * @return a Future representing pending completion of the rollback.
+ * @throws IgniteException If rollback failed.
+ */
+ public IgniteFuture<Void> rollbackAsync() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index 91eecbb..1d34d0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -251,6 +251,175 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testApiAsyncOld() throws Exception {
+ IgniteEvents evtAsync = grid(0).events().withAsync();
+
+ try {
+ evtAsync.stopRemoteListen(null);
+ evtAsync.future().get();
+ }
+ catch (NullPointerException ignored) {
+ // No-op.
+ }
+
+ evtAsync.stopRemoteListen(UUID.randomUUID());
+ evtAsync.future().get();
+
+ UUID consumeId = null;
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ },
+ EVTS_DISCOVERY
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ }
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+
+ try {
+ evtAsync.remoteListen(
+ new P2<UUID, Event>() {
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return false;
+ }
+ },
+ new P1<Event>() {
+ @Override public boolean apply(Event e) {
+ return false;
+ }
+ }
+ );
+
+ consumeId = (UUID)evtAsync.future().get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evtAsync.stopRemoteListen(consumeId);
+ evtAsync.future().get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testApiAsync() throws Exception {
+ IgniteEvents evt = grid(0).events();
+
+ try {
+ evt.stopRemoteListenAsync(null).get();
+ }
+ catch (NullPointerException ignored) {
+ // No-op.
+ }
+
+ evt.stopRemoteListenAsync(UUID.randomUUID()).get();
+
+ UUID consumeId = null;
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ },
+ EVTS_DISCOVERY
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, DiscoveryEvent>() {
+ @Override public boolean apply(UUID uuid, DiscoveryEvent evt) {
+ return false;
+ }
+ },
+ new P1<DiscoveryEvent>() {
+ @Override public boolean apply(DiscoveryEvent e) {
+ return false;
+ }
+ }
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+
+ try {
+ consumeId = evt.remoteListenAsync(
+ new P2<UUID, Event>() {
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return false;
+ }
+ },
+ new P1<Event>() {
+ @Override public boolean apply(Event e) {
+ return false;
+ }
+ }
+ ).get();
+
+ assertNotNull(consumeId);
+ }
+ finally {
+ evt.stopRemoteListenAsync(consumeId).get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAllEvents() throws Exception {
final Collection<UUID> nodeIds = new HashSet<>();
final AtomicInteger cnt = new AtomicInteger();
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 78fbf3b..1217005 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -265,6 +265,7 @@ public class IgfsMock implements IgfsEx {
throwUnsupported();
}
+ /** {@inheritDoc} */
@Override public IgniteFuture<Void> formatAsync() throws IgniteException {
throwUnsupported();
@@ -279,6 +280,7 @@ public class IgfsMock implements IgfsEx {
return null;
}
+ /** {@inheritDoc} */
@Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
throwUnsupported();
@@ -295,6 +297,7 @@ public class IgfsMock implements IgfsEx {
return null;
}
+ /** {@inheritDoc} */
@Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
@Nullable T arg) throws IgniteException {
@@ -311,6 +314,7 @@ public class IgfsMock implements IgfsEx {
return null;
}
+ /** {@inheritDoc} */
@Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
throwUnsupported();
@@ -327,6 +331,7 @@ public class IgfsMock implements IgfsEx {
return null;
}
+ /** {@inheritDoc} */
@Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
@Nullable T arg) throws IgniteException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
index 111cb71..21fbb45 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
@@ -287,6 +287,34 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDeployAsyncOnEachNode() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceOnEachNodeAsync";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount());
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployNodeSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testDeploySingleton() throws Exception {
Ignite g = randomGrid();
@@ -319,6 +347,34 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDeploySingletonAsync() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceSingletonAsync";
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployClusterSingletonAsync(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAffinityDeploy() throws Exception {
Ignite g = randomGrid();
@@ -348,6 +404,31 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testAffinityDeployAsync() throws Exception {
+ Ignite g = randomGrid();
+
+ final Integer affKey = 1;
+
+ // Store a cache key.
+ g.cache(CACHE_NAME).put(affKey, affKey.toString());
+
+ String name = "serviceAffinityAsync";
+
+ IgniteFuture<?> fut = g.services().deployKeyAffinitySingletonAsync(name, new AffinityService(affKey),
+ CACHE_NAME, affKey);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ checkCount(name, g.services().serviceDescriptors(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testDeployMultiple1() throws Exception {
Ignite g = randomGrid();
@@ -380,6 +461,34 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDeployMultipleAsync1() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceMultipleAsync1";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount() * 2);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), nodeCount() * 2, 3);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount() * 2, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount() * 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testDeployMultiple2() throws Exception {
Ignite g = randomGrid();
@@ -414,6 +523,36 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testDeployMultipleAsync2() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceMultipleAsync2";
+
+ int cnt = nodeCount() * 2 + 1;
+
+ CountDownLatch latch = new CountDownLatch(cnt);
+
+ DummyService.exeLatch(name, latch);
+
+ IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), cnt, 3);
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ assertEquals(name, cnt, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), cnt);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCancelSingleton() throws Exception {
Ignite g = randomGrid();
@@ -449,6 +588,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
/**
* @throws Exception If failed.
*/
+ public void testCancelSingletonAsync() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceCancelAsync";
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ g.services().deployClusterSingleton(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ latch = new CountDownLatch(1);
+
+ DummyService.cancelLatch(name, latch);
+
+ g.services().cancelAsync(name).get();
+
+ info("Cancelled service: " + name);
+
+ latch.await();
+
+ assertEquals(name, 1, DummyService.started(name));
+ assertEquals(name, 1, DummyService.cancelled(name));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCancelEachNode() throws Exception {
Ignite g = randomGrid();
@@ -482,6 +656,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCancelAsyncEachNode() throws Exception {
+ Ignite g = randomGrid();
+
+ String name = "serviceCancelEachNodeAsync";
+
+ CountDownLatch latch = new CountDownLatch(nodeCount());
+
+ DummyService.exeLatch(name, latch);
+
+ g.services().deployNodeSingleton(name, new DummyService());
+
+ info("Deployed service: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, 0, DummyService.cancelled(name));
+
+ latch = new CountDownLatch(nodeCount());
+
+ DummyService.cancelLatch(name, latch);
+
+ g.services().cancelAsync(name).get();
+
+ info("Cancelled service: " + name);
+
+ latch.await();
+
+ assertEquals(name, nodeCount(), DummyService.started(name));
+ assertEquals(name, nodeCount(), DummyService.cancelled(name));
+ }
+
+ /**
* @param svcName Service name.
* @param descs Descriptors.
* @param cnt Expected count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index e796eb5..46ed410 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -1025,7 +1025,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
/**
* @throws Exception If failed.
*/
- public void testAsync() throws Exception {
+ public void testAsyncOld() throws Exception {
final AtomicInteger msgCnt = new AtomicInteger();
assertFalse(ignite2.message().isAsync());
@@ -1109,6 +1109,58 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAsync() throws Exception {
+ final AtomicInteger msgCnt = new AtomicInteger();
+
+ assertFalse(ignite2.message().isAsync());
+
+ final IgniteMessaging msg = ignite2.message();
+
+ final String topic = "topic";
+
+ IgniteFuture<UUID> fut = msg.remoteListenAsync(topic, new P2<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ System.out.println(Thread.currentThread().getName() +
+ " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']');
+
+ msgCnt.incrementAndGet();
+
+ return true;
+ }
+ });
+
+ Assert.assertNotNull(fut);
+
+ UUID id = fut.get();
+
+ Assert.assertNotNull(id);
+
+ message(ignite1.cluster().forRemotes()).send(topic, "msg1");
+
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return msgCnt.get() > 0;
+ }
+ }, 5000);
+
+ assertEquals(1, msgCnt.get());
+
+ IgniteFuture<Void> stopFut =msg.stopRemoteListenAsync(id);
+
+ Assert.assertNotNull(stopFut);
+
+ stopFut.get();
+
+ message(ignite1.cluster().forRemotes()).send(topic, "msg2");
+
+ U.sleep(1000);
+
+ assertEquals(1, msgCnt.get());
+ }
+
+ /**
* Tests that message listener registers only for one oldest node.
*
* @throws Exception If an error occurred.
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
index bb2e046..8aba684 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
@@ -547,6 +548,11 @@ public abstract class GridAbstractCacheStoreSelfTest<T extends CacheStore<Object
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
// No-op.
}
@@ -570,5 +576,10 @@ public abstract class GridAbstractCacheStoreSelfTest<T extends CacheStore<Object
@Override public void rollback() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 76a88d9..a0d10c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -121,12 +121,25 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart,
+ int timeout, int maxConn) throws IgniteException {
+ throw new UnsupportedOperationException("Operation is not supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterStartNodeResult> startNodes(Collection<Map<String, Object>> hosts,
@Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(
+ Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts,
+ boolean restart, int timeout, int maxConn) throws IgniteException {
+ throw new UnsupportedOperationException("Operation is not supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public void stopNodes() throws IgniteException {
throw new UnsupportedOperationException("Operation is not supported yet.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
index d5af81e..a925300 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java
@@ -59,12 +59,25 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout,
+ @Nullable int... types) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr,
@Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(
+ @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe,
@Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
@Nullable int... types) throws IgniteException {
@@ -72,17 +85,35 @@ public class IgniteEventsProcessProxy implements IgniteEvents {
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, long interval,
+ boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public void stopRemoteListen(UUID opId) throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter,
@Nullable int... types) throws IgniteException {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter,
+ @Nullable int... types) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) {
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
index 22c6977..5de7363 100644
--- a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
+++ b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
@@ -1051,10 +1051,6 @@ public class IgniteProjectionStartStopRestartSelfTest extends GridCommonAbstract
boolean restart,
int timeout,
int maxConn) {
- cluster = cluster.withAsync();
-
- assertNull(cluster.startNodes(hosts, dflts, restart, timeout, maxConn));
-
- return cluster.<Collection<ClusterStartNodeResult>>future().get(WAIT_TIMEOUT);
+ return cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(WAIT_TIMEOUT);
}
}
\ No newline at end of file
[2/2] ignite git commit: IGNITE-4583: new async API at the
IgniteCluster, IgniteEvents, IgniteMessaging, IgniteServices, Transaction.
Posted by vo...@apache.org.
IGNITE-4583: new async API at the IgniteCluster, IgniteEvents, IgniteMessaging, IgniteServices, Transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fde1f486
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fde1f486
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fde1f486
Branch: refs/heads/ignite-4475-async
Commit: fde1f486d6465601cfd40d0eb775141501024a28
Parents: dd4d439
Author: devozerov <vo...@gridgain.com>
Authored: Wed Feb 8 15:42:07 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Feb 8 15:42:07 2017 +0300
----------------------------------------------------------------------
.../ignite/tests/utils/TestTransaction.java | 11 +
.../java/org/apache/ignite/IgniteCluster.java | 126 +++++++++++
.../java/org/apache/ignite/IgniteEvents.java | 128 ++++++++++-
.../java/org/apache/ignite/IgniteMessaging.java | 27 +++
.../java/org/apache/ignite/IgniteServices.java | 221 +++++++++++++++++--
.../ignite/internal/IgniteEventsImpl.java | 79 +++++++
.../ignite/internal/IgniteMessagingImpl.java | 39 ++++
.../ignite/internal/IgniteServicesImpl.java | 108 +++++++++
.../cluster/IgniteClusterAsyncImpl.java | 28 ++-
.../internal/cluster/IgniteClusterImpl.java | 24 +-
.../cache/transactions/IgniteInternalTx.java | 3 -
.../transactions/TransactionProxyImpl.java | 39 +++-
...formDotNetEntityFrameworkCacheExtension.java | 1 -
.../platform/events/PlatformEvents.java | 57 +++--
.../platform/messaging/PlatformMessaging.java | 35 +--
.../platform/services/PlatformServices.java | 95 +++++---
.../transactions/PlatformTransactions.java | 9 +-
.../apache/ignite/transactions/Transaction.java | 22 ++
.../continuous/GridEventConsumeSelfTest.java | 169 ++++++++++++++
.../internal/processors/igfs/IgfsMock.java | 5 +
.../GridServiceProcessorAbstractSelfTest.java | 209 ++++++++++++++++++
.../ignite/messaging/GridMessagingSelfTest.java | 54 ++++-
.../cache/GridAbstractCacheStoreSelfTest.java | 11 +
.../multijvm/IgniteClusterProcessProxy.java | 13 ++
.../multijvm/IgniteEventsProcessProxy.java | 31 +++
...gniteProjectionStartStopRestartSelfTest.java | 6 +-
26 files changed, 1437 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index 5f3ec69..4a03d25 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -17,6 +17,7 @@
package org.apache.ignite.tests.utils;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
@@ -106,6 +107,11 @@ public class TestTransaction implements Transaction {
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
// No-op.
}
@@ -129,4 +135,9 @@ public class TestTransaction implements Transaction {
@Override public void rollback() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index 23b03df..dc7b687 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -188,6 +188,33 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
int maxConn) throws IgniteException;
/**
+ * Starts one or more nodes on remote host(s) asynchronously.
+ * <p>
+ * This method takes INI file which defines all startup parameters. It can contain one or
+ * more sections, each for a host or for range of hosts (note that they must have different
+ * names) and a special '{@code defaults}' section with default values. They are applied to
+ * undefined parameters in host's sections.
+ * <p>
+ * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and
+ * contains hostname, success flag and error message if attempt was not successful. Note that
+ * successful attempt doesn't mean that node was actually started and joined topology. For large
+ * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual
+ * node logs for details.
+ *
+ * @param file Configuration file.
+ * @param restart Whether to stop existing nodes. If {@code true}, all existing
+ * nodes on the host will be stopped before starting new ones. If
+ * {@code false}, nodes will be started only if there are less
+ * nodes on the host than expected.
+ * @param timeout Connection timeout.
+ * @param maxConn Number of parallel SSH connections to one host.
+ * @return a Future representing pending completion of the starting nodes.
+ * @throws IgniteException In case of error.
+ */
+ public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, int timeout,
+ int maxConn) throws IgniteException;
+
+ /**
* Starts one or more nodes on remote host(s).
* <p>
* Each map in {@code hosts} collection
@@ -290,6 +317,104 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
@Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException;
/**
+ * Starts one or more nodes on remote host(s) asynchronously.
+ * <p>
+ * Each map in {@code hosts} collection
+ * defines startup parameters for one host or for a range of hosts. The following
+ * parameters are supported:
+ * <table class="doctable">
+ * <tr>
+ * <th>Name</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td><b>host</b></td>
+ * <td>String</td>
+ * <td>
+ * Hostname (required). Can define several hosts if their IPs are sequential.
+ * E.g., {@code 10.0.0.1~5} defines range of five IP addresses. Other
+ * parameters are applied to all hosts equally.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td><b>port</b></td>
+ * <td>Integer</td>
+ * <td>Port number (default is {@code 22}).</td>
+ * </tr>
+ * <tr>
+ * <td><b>uname</b></td>
+ * <td>String</td>
+ * <td>Username (if not defined, current local username will be used).</td>
+ * </tr>
+ * <tr>
+ * <td><b>passwd</b></td>
+ * <td>String</td>
+ * <td>Password (if not defined, private key file must be defined).</td>
+ * </tr>
+ * <tr>
+ * <td><b>key</b></td>
+ * <td>File</td>
+ * <td>Private key file (if not defined, password must be defined).</td>
+ * </tr>
+ * <tr>
+ * <td><b>nodes</b></td>
+ * <td>Integer</td>
+ * <td>
+ * Expected number of nodes on the host. If some nodes are started
+ * already, then only remaining nodes will be started. If current count of
+ * nodes is equal to this number, and {@code restart} flag is {@code false},
+ * then nothing will happen.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td><b>igniteHome</b></td>
+ * <td>String</td>
+ * <td>
+ * Path to Ignite installation folder. If not defined, IGNITE_HOME
+ * environment variable must be set on remote hosts.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td><b>cfg</b></td>
+ * <td>String</td>
+ * <td>Path to configuration file (relative to {@code igniteHome}).</td>
+ * </tr>
+ * <tr>
+ * <td><b>script</b></td>
+ * <td>String</td>
+ * <td>
+ * Custom startup script file name and path (relative to {@code igniteHome}).
+ * You can also specify a space-separated list of parameters in the same
+ * string (for example: {@code "bin/my-custom-script.sh -v"}).
+ * </td>
+ * </tr>
+ * </table>
+ * <p>
+ * {@code dflts} map defines default values. They are applied to undefined parameters in
+ * {@code hosts} collection.
+ * <p>
+ * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and
+ * contains hostname, success flag and error message if attempt was not successful. Note that
+ * successful attempt doesn't mean that node was actually started and joined topology. For large
+ * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual
+ * node logs for details.
+ *
+ * @param hosts Startup parameters.
+ * @param dflts Default values.
+ * @param restart Whether to stop existing nodes. If {@code true}, all existing
+ * nodes on the host will be stopped before starting new ones. If
+ * {@code false}, nodes will be started only if there are less
+ * nodes on the host than expected.
+ * @param timeout Connection timeout in milliseconds.
+ * @param maxConn Number of parallel SSH connections to one host.
+ * @return a Future representing pending completion of the starting nodes.
+ * @throws IgniteException In case of error.
+ */
+ public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(Collection<Map<String, Object>> hosts,
+ @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException;
+
+ /**
* Stops nodes satisfying optional set of predicates.
* <p>
* <b>NOTE:</b> {@code System.exit(Ignition.KILL_EXIT_CODE)} will be executed on each
@@ -347,5 +472,6 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
@Nullable public IgniteFuture<?> clientReconnectFuture();
/** {@inheritDoc} */
+ @Deprecated
@Override public IgniteCluster withAsync();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
index c0e4d3b..c081f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
@@ -25,6 +25,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -90,13 +91,27 @@ public interface IgniteEvents extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Asynchronously queries nodes in this cluster group for events using passed in predicate filter for event
+ * selection.
+ *
+ * @param p Predicate filter used to query events on remote nodes.
+ * @param timeout Maximum time to wait for result, {@code 0} to wait forever.
+ * @param types Event types to be queried.
+ * @return a Future representing pending completion of the query. The completed future contains
+ * collection of grid events returned from specified nodes.
+ * @throws IgniteException If query failed.
+ */
+ public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout,
+ @Nullable int... types) throws IgniteException;
+
+ /**
* Adds event listener for specified events to all nodes in the cluster group (possibly including
* local node if it belongs to the cluster group as well). This means that all events occurring on
* any node within this cluster group that pass remote filter will be sent to local node for
* local listener notifications.
* <p>
* The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback
- * returns {@code false} or if {@link #stopRemoteListen(UUID)} is called.
+ * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called.
*
* @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled
* on remote nodes by passed in {@code rmtFilter}.
@@ -108,7 +123,8 @@ public interface IgniteEvents extends IgniteAsyncSupport {
* @param types Types of events to listen for. If not provided, all events that pass the
* provided remote filter will be sent to local node.
* @param <T> Type of the event.
- * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening.
+ * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or
+ * {@link #stopRemoteListenAsync(UUID)} methods to stop listening.
* @throws IgniteException If failed to add listener.
*/
@IgniteAsyncSupported
@@ -118,6 +134,35 @@ public interface IgniteEvents extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including
+ * local node if it belongs to the cluster group as well). This means that all events occurring on
+ * any node within this cluster group that pass remote filter will be sent to local node for
+ * local listener notifications.
+ * <p>
+ * The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback
+ * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called.
+ *
+ * @param <T> Type of the event.
+ * @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled
+ * on remote nodes by passed in {@code rmtFilter}.
+ * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter
+ * will be sent to local node. If {@code null}, all events of specified types will
+ * be sent to local node. This remote filter can be used to pre-handle events remotely,
+ * before they are passed in to local callback. It will be auto-unsubsribed on the node
+ * where event occurred in case if it returns {@code false}.
+ * @param types Types of events to listen for. If not provided, all events that pass the
+ * provided remote filter will be sent to local node.
+ * @return a Future representing pending completion of the operation. The completed future contains
+ * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or
+ * {@link #stopRemoteListenAsync(UUID)} methods to stop listening.
+ * @throws IgniteException If failed to add listener.
+ */
+ public <T extends Event> IgniteFuture<UUID> remoteListenAsync(@Nullable IgniteBiPredicate<UUID, T> locLsnr,
+ @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types)
+ throws IgniteException;
+
+ /**
* Adds event listener for specified events to all nodes in the cluster group (possibly including
* local node if it belongs to the cluster group as well). This means that all events occurring on
* any node within this cluster group that pass remote filter will be sent to local node for
@@ -148,9 +193,11 @@ public interface IgniteEvents extends IgniteAsyncSupport {
* @param types Types of events to listen for. If not provided, all events that pass the
* provided remote filter will be sent to local node.
* @param <T> Type of the event.
- * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening.
- * @see #stopRemoteListen(UUID)
+ * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or
+ * {@link #stopRemoteListen(UUID)} methods to stop listening.
* @throws IgniteException If failed to add listener.
+ * @see #stopRemoteListen(UUID)
+ * @see #stopRemoteListenAsync(UUID)
*/
@IgniteAsyncSupported
public <T extends Event> UUID remoteListen(int bufSize,
@@ -162,6 +209,50 @@ public interface IgniteEvents extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including
+ * local node if it belongs to the cluster group as well). This means that all events occurring on
+ * any node within this cluster group that pass remote filter will be sent to local node for
+ * local listener notification.
+ *
+ * @param <T> Type of the event.
+ * @param bufSize Remote events buffer size. Events from remote nodes won't be sent until buffer
+ * is full or time interval is exceeded.
+ * @param interval Maximum time interval after which events from remote node will be sent. Events
+ * from remote nodes won't be sent until buffer is full or time interval is exceeded.
+ * @param autoUnsubscribe Flag indicating that event listeners on remote nodes should be
+ * automatically unregistered if master node (node that initiated event listening) leaves
+ * topology. If this flag is {@code false}, listeners will be unregistered only when
+ * {@link #stopRemoteListen(UUID)} method is called, or the {@code 'callback (locLsnr)'}
+ * passed in returns {@code false}.
+ * @param locLsnr Callback that is called on local node. If this predicate returns {@code true},
+ * the implementation will continue listening to events. Otherwise, events
+ * listening will be stopped and listeners will be unregistered on all nodes
+ * in the cluster group. If {@code null}, this events will be handled on remote nodes by
+ * passed in {@code rmtFilter} until local node stops (if {@code 'autoUnsubscribe'} is {@code true})
+ * or until {@link #stopRemoteListen(UUID)} is called.
+ * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter
+ * will be sent to local node. If {@code null}, all events of specified types will
+ * be sent to local node. This remote filter can be used to pre-handle events remotely,
+ * before they are passed in to local callback. It will be auto-unsubsribed on the node
+ * where event occurred in case if it returns {@code false}.
+ * @param types Types of events to listen for. If not provided, all events that pass the
+ * provided remote filter will be sent to local node.
+ * @return a Future representing pending completion of the operation. The completed future contains
+ * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)}
+ * or {@link #stopRemoteListen(UUID)} methods to stop listening.
+ * @throws IgniteException If failed to add listener.
+ * @see #stopRemoteListen(UUID)
+ * @see #stopRemoteListenAsync(UUID)
+ */
+ public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize,
+ long interval,
+ boolean autoUnsubscribe,
+ @Nullable IgniteBiPredicate<UUID, T> locLsnr,
+ @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types)
+ throws IgniteException;
+
+ /**
* Stops listening to remote events. This will unregister all listeners identified with provided
* operation ID on all nodes defined by {@link #clusterGroup()}.
* <p>
@@ -169,13 +260,27 @@ public interface IgniteEvents extends IgniteAsyncSupport {
*
* @param opId Operation ID that was returned from
* {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method.
- * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
* @throws IgniteException If failed to stop listeners.
+ * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
+ * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...)
*/
@IgniteAsyncSupported
public void stopRemoteListen(UUID opId) throws IgniteException;
/**
+ * Asynchronously stops listening to remote events. This will unregister all listeners identified with provided
+ * operation ID on all nodes defined by {@link #clusterGroup()}.
+ *
+ * @param opId Operation ID that was returned from
+ * {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to stop listeners.
+ * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
+ * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...)
+ */
+ public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException;
+
+ /**
* Waits for the specified events.
* <p>
* Supports asynchronous execution (see {@link IgniteAsyncSupport}).
@@ -191,6 +296,18 @@ public interface IgniteEvents extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Create future to wait for the specified events.
+ *
+ * @param filter Optional filtering predicate. Only if predicates evaluates to {@code true} will the event
+ * end the wait.
+ * @param types Types of the events to wait for. If not provided, all events will be passed to the filter.
+ * @return a Future representing pending completion of the operation. The completed future contains grid event.
+ * @throws IgniteException If wait was interrupted.
+ */
+ public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter,
+ @Nullable int... types) throws IgniteException;
+
+ /**
* Queries local node for events using passed-in predicate filter for event selection.
*
* @param p Predicate to filter events. All predicates must be satisfied for the
@@ -269,5 +386,6 @@ public interface IgniteEvents extends IgniteAsyncSupport {
public boolean isEnabled(int type);
/** {@inheritDoc} */
+ @Deprecated
@Override public IgniteEvents withAsync();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index ab554af..bb96d65 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
/**
@@ -150,6 +151,22 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
public UUID remoteListen(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) throws IgniteException;
/**
+ * Asynchronously adds a message listener for a given topic to all nodes in the cluster group (possibly including
+ * this node if it belongs to the cluster group as well). This means that any node within this cluster
+ * group can send a message for a given topic and all nodes within the cluster group will receive
+ * listener notifications.
+ *
+ * @param topic Topic to subscribe to, {@code null} means default topic.
+ * @param p Predicate that is called on each node for each received message. If predicate returns {@code false},
+ * then it will be unsubscribed from any further notifications.
+ * @return a Future representing pending completion of the operation. The completed future contains
+ * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening.
+ * @throws IgniteException If failed to add listener.
+ */
+ public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p)
+ throws IgniteException;
+
+ /**
* Unregisters all listeners identified with provided operation ID on all nodes in the cluster group.
* <p>
* Supports asynchronous execution (see {@link IgniteAsyncSupport}).
@@ -160,6 +177,16 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
@IgniteAsyncSupported
public void stopRemoteListen(UUID opId) throws IgniteException;
+ /**
+ * Asynchronously unregisters all listeners identified with provided operation ID on all nodes in the cluster group.
+ *
+ * @param opId Listen ID that was returned from {@link #remoteListen(Object, IgniteBiPredicate)} method.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to unregister listeners.
+ */
+ public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException;
+
/** {@inheritDoc} */
+ @Deprecated
@Override IgniteMessaging withAsync();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
index 8365ec7..1c01598 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
@@ -150,7 +151,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
* when a singleton service instance will be active on more than one node (e.g. crash detection delay).
* <p>
* This method is analogous to calling
- * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)} method.
+ * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)}
+ * method.
*
* @param name Service name.
* @param svc Service instance.
@@ -160,13 +162,35 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void deployClusterSingleton(String name, Service svc) throws IgniteException;
/**
+ * Asynchronously deploys a cluster-wide singleton service. Ignite will guarantee that there is always
+ * one instance of the service in the cluster. In case if grid node on which the service
+ * was deployed crashes or stops, Ignite will automatically redeploy it on another node.
+ * However, if the node on which the service is deployed remains in topology, then the
+ * service will always be deployed on that node only, regardless of topology changes.
+ * <p>
+ * Note that in case of topology changes, due to network delays, there may be a temporary situation
+ * when a singleton service instance will be active on more than one node (e.g. crash detection delay).
+ * <p>
+ * This method is analogous to calling
+ * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int)
+ * deployMultipleAsync(name, svc, 1, 1)} method.
+ *
+ * @param name Service name.
+ * @param svc Service instance.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to deploy service.
+ */
+ public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException;
+
+ /**
* Deploys a per-node singleton service. Ignite will guarantee that there is always
* one instance of the service running on each node. Whenever new nodes are started
* within the underlying cluster group, Ignite will automatically deploy one instance of
* the service on every new node.
* <p>
* This method is analogous to calling
- * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)} method.
+ * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)}
+ * method.
*
* @param name Service name.
* @param svc Service instance.
@@ -176,6 +200,23 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void deployNodeSingleton(String name, Service svc) throws IgniteException;
/**
+ * Asynchronously deploys a per-node singleton service. Ignite will guarantee that there is always
+ * one instance of the service running on each node. Whenever new nodes are started
+ * within the underlying cluster group, Ignite will automatically deploy one instance of
+ * the service on every new node.
+ * <p>
+ * This method is analogous to calling
+ * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int)
+ * deployMultipleAsync(name, svc, 0, 1)} method.
+ *
+ * @param name Service name.
+ * @param svc Service instance.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to deploy service.
+ */
+ public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException;
+
+ /**
* Deploys one instance of this service on the primary node for a given affinity key.
* Whenever topology changes and primary node assignment changes, Ignite will always
* make sure that the service is undeployed on the previous primary node and deployed
@@ -184,8 +225,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
* Note that in case of topology changes, due to network delays, there may be a temporary situation
* when a service instance will be active on more than one node (e.g. crash detection delay).
* <p>
- * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method
- * as follows:
+ * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)}
+ * method as follows:
* <pre name="code" class="java">
* ServiceConfiguration cfg = new ServiceConfiguration();
*
@@ -211,6 +252,41 @@ public interface IgniteServices extends IgniteAsyncSupport {
throws IgniteException;
/**
+ * Asynchronously deploys one instance of this service on the primary node for a given affinity key.
+ * Whenever topology changes and primary node assignment changes, Ignite will always
+ * make sure that the service is undeployed on the previous primary node and deployed
+ * on the new primary node.
+ * <p>
+ * Note that in case of topology changes, due to network delays, there may be a temporary situation
+ * when a service instance will be active on more than one node (e.g. crash detection delay).
+ * <p>
+ * This method is analogous to the invocation of
+ * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows:
+ * <pre name="code" class="java">
+ * ServiceConfiguration cfg = new ServiceConfiguration();
+ *
+ * cfg.setName(name);
+ * cfg.setService(svc);
+ * cfg.setCacheName(cacheName);
+ * cfg.setAffinityKey(affKey);
+ * cfg.setTotalCount(1);
+ * cfg.setMaxPerNodeCount(1);
+ *
+ * ignite.services().deployAsync(cfg);
+ * </pre>
+ *
+ * @param name Service name.
+ * @param svc Service instance.
+ * @param cacheName Name of the cache on which affinity for key should be calculated, {@code null} for
+ * default cache.
+ * @param affKey Affinity cache key.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to deploy service.
+ */
+ public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, @Nullable String cacheName,
+ Object affKey) throws IgniteException;
+
+ /**
* Deploys multiple instances of the service on the grid. Ignite will deploy a
* maximum amount of services equal to {@code 'totalCnt'} parameter making sure that
* there are no more than {@code 'maxPerNodeCnt'} service instances running
@@ -221,8 +297,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
* Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have
* value greater than {@code 0}.
* <p>
- * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method
- * as follows:
+ * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)}
+ * method as follows:
* <pre name="code" class="java">
* ServiceConfiguration cfg = new ServiceConfiguration();
*
@@ -244,20 +320,57 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) throws IgniteException;
/**
+ * Asynchronously deploys multiple instances of the service on the grid. Ignite will deploy a
+ * maximum amount of services equal to {@code 'totalCnt'} parameter making sure that
+ * there are no more than {@code 'maxPerNodeCnt'} service instances running
+ * on each node. Whenever topology changes, Ignite will automatically rebalance
+ * the deployed services within cluster to make sure that each node will end up with
+ * about equal number of deployed instances whenever possible.
+ * <p>
+ * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have
+ * value greater than {@code 0}.
+ * <p>
+ * This method is analogous to the invocation of
+ * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows:
+ * <pre name="code" class="java">
+ * ServiceConfiguration cfg = new ServiceConfiguration();
+ *
+ * cfg.setName(name);
+ * cfg.setService(svc);
+ * cfg.setTotalCount(totalCnt);
+ * cfg.setMaxPerNodeCount(maxPerNodeCnt);
+ *
+ * ignite.services().deployAsync(cfg);
+ * </pre>
+ *
+ * @param name Service name.
+ * @param svc Service instance.
+ * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited.
+ * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to deploy service.
+ */
+ public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt)
+ throws IgniteException;
+
+ /**
* Deploys multiple instances of the service on the grid according to provided
* configuration. Ignite will deploy a maximum amount of services equal to
* {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter
- * making sure that there are no more than {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()}
+ * making sure that there are no more than
+ * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()}
* service instances running on each node. Whenever topology changes, Ignite will automatically rebalance
* the deployed services within cluster to make sure that each node will end up with
* about equal number of deployed instances whenever possible.
* <p>
- * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} is not {@code null}, then Ignite
- * will deploy the service on the primary node for given affinity key. The affinity will be calculated
- * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name.
+ * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()}
+ * is not {@code null}, then Ignite will deploy the service on the primary node for given affinity key.
+ * The affinity will be calculated on the cache with
+ * {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name.
* <p>
- * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} is not {@code null}, then
- * Ignite will deploy service on all grid nodes for which the provided filter evaluates to {@code true}.
+ * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()}
+ * is not {@code null}, then Ignite will deploy service on all grid nodes for which
+ * the provided filter evaluates to {@code true}.
* The node filter will be checked in addition to the underlying cluster group filter, or the
* whole grid, if the underlying cluster group includes all the cluster nodes.
* <p>
@@ -283,12 +396,56 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void deploy(ServiceConfiguration cfg) throws IgniteException;
/**
+ * Asynchronously deploys multiple instances of the service on the grid according to provided
+ * configuration. Ignite will deploy a maximum amount of services equal to
+ * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter
+ * making sure that there are no more than
+ * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()}
+ * service instances running on each node. Whenever topology changes, Ignite will automatically rebalance
+ * the deployed services within cluster to make sure that each node will end up with
+ * about equal number of deployed instances whenever possible.
+ * <p>
+ * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()}
+ * is not {@code null}, then Ignite
+ * will deploy the service on the primary node for given affinity key. The affinity will be calculated
+ * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name.
+ * <p>
+ * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()}
+ * is not {@code null}, then Ignite will deploy service on all grid nodes
+ * for which the provided filter evaluates to {@code true}.
+ * The node filter will be checked in addition to the underlying cluster group filter, or the
+ * whole grid, if the underlying cluster group includes all the cluster nodes.
+ * <p>
+ * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have
+ * value greater than {@code 0}.
+ * <p>
+ * Here is an example of creating service deployment configuration:
+ * <pre name="code" class="java">
+ * ServiceConfiguration cfg = new ServiceConfiguration();
+ *
+ * cfg.setName(name);
+ * cfg.setService(svc);
+ * cfg.setTotalCount(0); // Unlimited.
+ * cfg.setMaxPerNodeCount(2); // Deploy 2 instances of service on each node.
+ *
+ * ignite.services().deployAsync(cfg);
+ * </pre>
+ *
+ * @param cfg Service configuration.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to deploy service.
+ */
+ public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException;
+
+ /**
* Cancels service deployment. If a service with specified name was deployed on the grid,
- * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} method will be called on it.
+ * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+ * method will be called on it.
* <p>
- * Note that Ignite cannot guarantee that the service exits from {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
- * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} is called. It is up to the user to
- * make sure that the service code properly reacts to cancellations.
+ * Note that Ignite cannot guarantee that the service exits from
+ * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
+ * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+ * is called. It is up to the user to make sure that the service code properly reacts to cancellations.
* <p>
* Supports asynchronous execution (see {@link IgniteAsyncSupport}).
*
@@ -299,6 +456,23 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void cancel(String name) throws IgniteException;
/**
+ * Asynchronously cancels service deployment. If a service with specified name was deployed on the grid,
+ * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+ * method will be called on it.
+ * <p>
+ * Note that Ignite cannot guarantee that the service exits from
+ * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
+ * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+ * is called. It is up to the user to
+ * make sure that the service code properly reacts to cancellations.
+ *
+ * @param name Name of service to cancel.
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to cancel service.
+ */
+ public IgniteFuture<Void> cancelAsync(String name) throws IgniteException;
+
+ /**
* Cancels all deployed services.
* <p>
* Note that depending on user logic, it may still take extra time for a service to
@@ -312,6 +486,17 @@ public interface IgniteServices extends IgniteAsyncSupport {
public void cancelAll() throws IgniteException;
/**
+ * Asynchronously cancels all deployed services.
+ * <p>
+ * Note that depending on user logic, it may still take extra time for a service to
+ * finish execution, even after it was cancelled.
+ *
+ * @return a Future representing pending completion of the operation.
+ * @throws IgniteException If failed to cancel services.
+ */
+ public IgniteFuture<Void> cancelAllAsync() throws IgniteException;
+
+ /**
* Gets metadata about all deployed services in the grid.
*
* @return Metadata about all deployed services in the grid.
@@ -364,8 +549,10 @@ public interface IgniteServices extends IgniteAsyncSupport {
* @return Either proxy over remote service or local service if it is deployed locally.
* @throws IgniteException If failed to create service proxy.
*/
- public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException;
+ public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout)
+ throws IgniteException;
/** {@inheritDoc} */
+ @Deprecated
@Override public IgniteServices withAsync();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 3c6218d..9acccab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -27,13 +27,16 @@ import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -93,12 +96,34 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout,
+ @Nullable int... types) throws IgniteException {
+
+ guard();
+
+ try {
+ return new IgniteFutureImpl<>(ctx.event().remoteEventsAsync(compoundPredicate(p, types),
+ prj.nodes(), timeout));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr,
@Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) {
return remoteListen(1, 0, true, locLsnr, rmtFilter, types);
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(
+ @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types) throws IgniteException {
+ return remoteListenAsync(1, 0, true, locLsnr, rmtFilter, types);
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> UUID remoteListen(int bufSize, long interval,
boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
@Nullable int... types) {
@@ -128,6 +153,32 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, long interval,
+ boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types) throws IgniteException {
+ A.ensure(bufSize > 0, "bufSize > 0");
+ A.ensure(interval >= 0, "interval >= 0");
+
+ guard();
+
+ try {
+ GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
+ (IgnitePredicate<Event>)rmtFilter, types);
+
+ return new IgniteFutureImpl<>(ctx.continuous().startRoutine(
+ hnd,
+ false,
+ bufSize,
+ interval,
+ autoUnsubscribe,
+ prj.predicate()));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void stopRemoteListen(UUID opId) {
A.notNull(opId, "consumeId");
@@ -145,6 +196,21 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException {
+ A.notNull(opId, "consumeId");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter,
@Nullable int... types) {
guard();
@@ -161,6 +227,19 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
}
/** {@inheritDoc} */
+ @Override public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter,
+ @Nullable int... types) throws IgniteException {
+ guard();
+
+ try {
+ return new IgniteFutureImpl<>(ctx.event().waitForEvent(filter, types));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) {
A.notNull(p, "p");
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 2800777..e5c00bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -25,15 +25,18 @@ import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
/**
@@ -200,6 +203,28 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic,
+ IgniteBiPredicate<UUID, ?> p) throws IgniteException {
+ A.notNull(p, "p");
+
+ guard();
+
+ try {
+ GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p);
+
+ return new IgniteFutureImpl<>(ctx.continuous().startRoutine(hnd,
+ false,
+ 1,
+ 0,
+ false,
+ prj.predicate()));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void stopRemoteListen(UUID opId) {
A.notNull(opId, "opId");
@@ -216,6 +241,20 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException {
+ A.notNull(opId, "opId");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId));
+ }
+ finally {
+ unguard();
+ }
+ }
+
/**
* <tt>ctx.gateway().readLock()</tt>
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index df6e5df..607dccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -28,8 +28,10 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;
@@ -38,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
/**
* {@link org.apache.ignite.IgniteServices} implementation.
*/
+@SuppressWarnings("unchecked")
public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteServices, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -91,6 +94,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException {
+ A.notNull(name, "name");
+ A.notNull(svc, "svc");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployNodeSingleton(prj, name, svc));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void deployClusterSingleton(String name, Service svc) {
A.notNull(name, "name");
A.notNull(svc, "svc");
@@ -109,6 +127,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException {
+ A.notNull(name, "name");
+ A.notNull(svc, "svc");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployClusterSingleton(prj, name, svc));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) {
A.notNull(name, "name");
A.notNull(svc, "svc");
@@ -127,6 +160,23 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt,
+ int maxPerNodeCnt) throws IgniteException {
+ A.notNull(name, "name");
+ A.notNull(svc, "svc");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployMultiple(prj, name, svc,
+ totalCnt, maxPerNodeCnt));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void deployKeyAffinitySingleton(String name, Service svc, @Nullable String cacheName,
Object affKey) {
A.notNull(name, "name");
@@ -147,6 +197,24 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc,
+ @Nullable String cacheName, Object affKey) throws IgniteException {
+ A.notNull(name, "name");
+ A.notNull(svc, "svc");
+ A.notNull(affKey, "affKey");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployKeyAffinitySingleton(name, svc,
+ cacheName, affKey));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void deploy(ServiceConfiguration cfg) {
A.notNull(cfg, "cfg");
@@ -164,6 +232,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException {
+ A.notNull(cfg, "cfg");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deploy(cfg));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void cancel(String name) {
A.notNull(name, "name");
@@ -181,6 +263,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> cancelAsync(String name) throws IgniteException {
+ A.notNull(name, "name");
+
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancel(name));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void cancelAll() {
guard();
@@ -196,6 +292,18 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> cancelAllAsync() throws IgniteException {
+ guard();
+
+ try {
+ return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancelAll());
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ServiceDescriptor> serviceDescriptors() {
guard();
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index fb9b190..d392813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -30,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -115,7 +115,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
int maxConn)
{
try {
- return saveOrGet(cluster.startNodesAsync(file, restart, timeout, maxConn));
+ return saveOrGet(cluster.startNodesAsync0(file, restart, timeout, maxConn));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -123,6 +123,12 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart,
+ int timeout, int maxConn) throws IgniteException {
+ return cluster.startNodesAsync(file, restart, timeout, maxConn);
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterStartNodeResult> startNodes(
Collection<Map<String, Object>> hosts,
@Nullable Map<String, Object> dflts,
@@ -131,7 +137,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
int maxConn)
{
try {
- return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn));
+ return saveOrGet(cluster.startNodesAsync0(hosts, dflts, restart, timeout, maxConn));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -139,6 +145,13 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(
+ Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts,
+ boolean restart, int timeout, int maxConn) throws IgniteException {
+ return cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn);
+ }
+
+ /** {@inheritDoc} */
@Override public void stopNodes() {
cluster.stopNodes();
}
@@ -312,13 +325,4 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(cluster);
}
-
- /**
- * @return Cluster async instance.
- *
- * @throws ObjectStreamException If failed.
- */
- protected Object readResolve() throws ObjectStreamException {
- return cluster.withAsync();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index aa5e63f..857c1ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.nodestart.IgniteRemoteStartSpecification;
import org.apache.ignite.internal.util.nodestart.IgniteSshHelper;
import org.apache.ignite.internal.util.nodestart.StartNodeCallable;
@@ -222,7 +223,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
throws IgniteException
{
try {
- return startNodesAsync(file, restart, timeout, maxConn).get();
+ return startNodesAsync0(file, restart, timeout, maxConn).get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -230,6 +231,12 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart,
+ int timeout, int maxConn) throws IgniteException {
+ return new IgniteFutureImpl<>(startNodesAsync0(file, restart, timeout, maxConn));
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterStartNodeResult> startNodes(Collection<Map<String, Object>> hosts,
@Nullable Map<String, Object> dflts,
boolean restart,
@@ -238,7 +245,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
throws IgniteException
{
try {
- return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
+ return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -246,6 +253,13 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(
+ Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts,
+ boolean restart, int timeout, int maxConn) throws IgniteException {
+ return new IgniteFutureImpl<>(startNodesAsync0(hosts, dflts, restart, timeout, maxConn));
+ }
+
+ /** {@inheritDoc} */
@Override public void stopNodes() throws IgniteException {
guard();
@@ -330,7 +344,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
* @return Future with results.
* @see IgniteCluster#startNodes(java.io.File, boolean, int, int)
*/
- IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file,
+ IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file,
boolean restart,
int timeout,
int maxConn)
@@ -342,7 +356,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
try {
IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
- return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
+ return startNodesAsync0(t.get1(), t.get2(), restart, timeout, maxConn);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -358,7 +372,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
* @return Future with results.
* @see IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)
*/
- IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync(
+ IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(
Collection<Map<String, Object>> hosts,
@Nullable Map<String, Object> dflts,
boolean restart,
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..30f5c5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -187,7 +186,6 @@ public interface IgniteInternalTx extends AutoCloseable {
*
* @throws IgniteCheckedException If commit failed.
*/
- @IgniteAsyncSupported
public void commit() throws IgniteCheckedException;
/**
@@ -202,7 +200,6 @@ public interface IgniteInternalTx extends AutoCloseable {
*
* @throws IgniteCheckedException If rollback failed.
*/
- @IgniteAsyncSupported
public void rollback() throws IgniteCheckedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 6134b9f..2a058ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -45,6 +46,7 @@ import org.apache.ignite.transactions.TransactionState;
/**
* Cache transaction proxy.
*/
+@SuppressWarnings("unchecked")
public class TransactionProxyImpl<K, V> implements TransactionProxy, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -269,6 +271,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+ enter();
+
+ try {
+ return (IgniteFuture<Void>)createFuture(cctx.commitTxAsync(tx));
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
enter();
@@ -303,6 +317,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
+ enter();
+
+ try {
+ return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx)));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
/**
* @param res Result to convert to finished future.
*/
@@ -314,13 +343,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* @param fut Internal future.
*/
private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
+ asyncRes = createFuture(fut);
+ }
+
+ /**
+ * @param fut Internal future.
+ * @return User future.
+ */
+ private IgniteFuture<?> createFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
@Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
return fut.get().proxy();
}
});
- asyncRes = new IgniteFutureImpl(fut0);
+ return new IgniteFutureImpl(fut0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
index eb675fb..cb27b19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.entityframework;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 9ddcc37..e16abe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.platform.events;
+import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -29,8 +29,8 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -91,9 +91,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
private final IgniteEvents events;
/** */
- private final IgniteEvents eventsAsync;
-
- /** */
private final EventResultWriter eventResWriter;
/** */
@@ -111,7 +108,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
assert events != null;
this.events = events;
- eventsAsync = events.withAsync();
eventResWriter = new EventResultWriter(platformCtx);
eventColResWriter = new EventCollectionResultWriter(platformCtx);
@@ -148,16 +144,12 @@ public class PlatformEvents extends PlatformAbstractTarget {
return TRUE;
case OP_REMOTE_QUERY_ASYNC:
- startRemoteQuery(reader, eventsAsync);
-
- readAndListenFuture(reader, currentFuture(), eventColResWriter);
+ readAndListenFuture(reader, startRemoteQueryAsync(reader, events), eventColResWriter);
return TRUE;
case OP_WAIT_FOR_LOCAL_ASYNC: {
- startWaitForLocal(reader, eventsAsync);
-
- readAndListenFuture(reader, currentFuture(), eventResWriter);
+ readAndListenFuture(reader, startWaitForLocalAsync(reader, events), eventResWriter);
return TRUE;
}
@@ -253,6 +245,23 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/**
+ * Starts the waitForLocal asynchronously.
+ *
+ * @param reader Reader
+ * @param events Events.
+ * @return Result.
+ */
+ private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) {
+ Long filterHnd = reader.readObject();
+
+ IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null;
+
+ int[] eventTypes = readEventTypes(reader);
+
+ return events.waitForLocalAsync(filter, eventTypes);
+ }
+
+ /**
* Starts the remote query.
*
* @param reader Reader.
@@ -271,6 +280,25 @@ public class PlatformEvents extends PlatformAbstractTarget {
return events.remoteQuery(filter, timeout);
}
+ /**
+ * Starts the remote query asynchronously.
+ *
+ * @param reader Reader.
+ * @param events Events.
+ * @return Result.
+ */
+ private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx reader, IgniteEvents events) {
+ Object pred = reader.readObjectDetached();
+
+ long timeout = reader.readLong();
+
+ int[] types = readEventTypes(reader);
+
+ PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
+
+ return events.remoteQueryAsync(filter, timeout);
+ }
+
/** {@inheritDoc} */
@Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
@@ -311,11 +339,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
- }
-
- /** {@inheritDoc} */
@Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
switch (opId) {
case OP_WAIT_FOR_LOCAL:
http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 6fe109e..8018986 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.platform.messaging;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -27,7 +26,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
import java.util.UUID;
@@ -68,9 +67,6 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/** */
private final IgniteMessaging messaging;
- /** */
- private final IgniteMessaging messagingAsync;
-
/**
* Ctor.
*
@@ -83,7 +79,6 @@ public class PlatformMessaging extends PlatformAbstractTarget {
assert messaging != null;
this.messaging = messaging;
- messagingAsync = messaging.withAsync();
}
/** {@inheritDoc} */
@@ -132,15 +127,15 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
case OP_REMOTE_LISTEN_ASYNC: {
- startRemoteListen(reader, messagingAsync);
+ readAndListenFuture(reader, startRemoteListenAsync(reader, messaging));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_STOP_REMOTE_LISTEN_ASYNC: {
- messagingAsync.stopRemoteListen(reader.readUuid());
+ readAndListenFuture(reader, messaging.stopRemoteListenAsync(reader.readUuid()));
- return readAndListenFuture(reader);
+ return TRUE;
}
default:
@@ -167,6 +162,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/**
* Starts the remote listener.
* @param reader Reader.
+ * @param messaging Messaging.
* @return Listen id.
*/
private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) {
@@ -181,9 +177,22 @@ public class PlatformMessaging extends PlatformAbstractTarget {
return messaging.remoteListen(topic, filter);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
+ /**
+ * Starts the remote listener.
+ * @param reader Reader.
+ * @param messaging Messaging.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx reader, IgniteMessaging messaging) {
+ Object nativeFilter = reader.readObjectDetached();
+
+ long ptr = reader.readLong(); // interop pointer
+
+ Object topic = reader.readObjectDetached();
+
+ PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+ return messaging.remoteListenAsync(topic, filter);
}
/** {@inheritDoc} */