You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/27 10:20:21 UTC
[07/10] ignite git commit: IGNITE-4475: New async API: now all async
methods are defined explicitly,
IgniteAsyncSupport is deprecated. This closes #1648.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 8ffec00..8750cab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -46,6 +47,7 @@ import org.apache.ignite.transactions.TransactionState;
/**
* Cache transaction proxy.
*/
+@SuppressWarnings("unchecked")
public class TransactionProxyImpl<K, V> implements TransactionProxy, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -270,6 +272,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+ enter();
+
+ try {
+ return (IgniteFuture<Void>)createFuture(cctx.commitTxAsync(tx));
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
enter();
@@ -304,6 +318,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
+ enter();
+
+ try {
+ return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx)));
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
/**
* @param res Result to convert to finished future.
*/
@@ -315,6 +344,14 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* @param fut Internal future.
*/
private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
+ asyncRes = createFuture(fut);
+ }
+
+ /**
+ * @param fut Internal future.
+ * @return User future.
+ */
+ private IgniteFuture<?> createFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
@Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
fut.get();
@@ -323,7 +360,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
});
- asyncRes = new IgniteFutureImpl(fut0);
+ return new IgniteFutureImpl(fut0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 106ef60..b5289a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsBlockLocation;
@@ -36,6 +37,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.AsyncSupportAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -58,7 +60,7 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
/** {@inheritDoc} */
@Override public void format() {
try {
- saveOrGet(igfs.formatAsync());
+ saveOrGet(igfs.formatAsync0());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -66,10 +68,15 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
+ @Override public IgniteFuture<Void> formatAsync() throws IgniteException {
+ return igfs.formatAsync();
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
try {
- return saveOrGet(igfs.executeAsync(task, rslvr, paths, arg));
+ return saveOrGet(igfs.executeAsync0(task, rslvr, paths, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -77,10 +84,16 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ return igfs.executeAsync(task, rslvr, paths, arg);
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
try {
- return saveOrGet(igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+ return saveOrGet(igfs.executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -88,10 +101,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ return igfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg);
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
try {
- return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, arg));
+ return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -99,11 +119,17 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ return igfs.executeAsync(taskCls, rslvr, paths, arg);
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) {
try {
- return saveOrGet(igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+ return saveOrGet(igfs.executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -111,6 +137,13 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ return igfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg);
+ }
+
+ /** {@inheritDoc} */
@Override public void stop(boolean cancel) {
igfs.stop(cancel);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 7165f31..18506cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallabl
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -1430,12 +1431,17 @@ public final class IgfsImpl implements IgfsEx {
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> formatAsync() throws IgniteException {
+ return (IgniteFuture<Void>)createFuture(formatAsync0());
+ }
+
/**
* Formats the file system removing all existing entries from it.
*
* @return Future.
*/
- IgniteInternalFuture<?> formatAsync() {
+ IgniteInternalFuture<?> formatAsync0() {
GridFutureAdapter<?> fut = new GridFutureAdapter<>();
Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
@@ -1452,7 +1458,7 @@ public final class IgfsImpl implements IgfsEx {
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
try {
- return executeAsync(task, rslvr, paths, arg).get();
+ return executeAsync0(task, rslvr, paths, arg).get();
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1460,10 +1466,16 @@ public final class IgfsImpl implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ return createFuture(executeAsync0(task, rslvr, paths, arg));
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
try {
- return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get();
+ return executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get();
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1471,10 +1483,17 @@ public final class IgfsImpl implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ return createFuture(executeAsync0(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
try {
- return executeAsync(taskCls, rslvr, paths, arg).get();
+ return executeAsync0(taskCls, rslvr, paths, arg).get();
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1482,17 +1501,30 @@ public final class IgfsImpl implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ return createFuture(executeAsync0(taskCls, rslvr, paths, arg));
+ }
+
+ /** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
long maxRangeSize, @Nullable T arg) {
try {
- return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get();
+ return executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get();
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
}
}
+ /** {@inheritDoc} */
+ @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen,
+ @Nullable T arg) throws IgniteException {
+ return createFuture(executeAsync0(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+ }
+
/**
* Executes IGFS task asynchronously.
*
@@ -1502,9 +1534,9 @@ public final class IgfsImpl implements IgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ <T, R> IgniteInternalFuture<R> executeAsync0(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
- return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
+ return executeAsync0(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
}
/**
@@ -1521,7 +1553,7 @@ public final class IgfsImpl implements IgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ <T, R> IgniteInternalFuture<R> executeAsync0(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
return igfsCtx.kernalContext().task().execute(task, new IgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr,
skipNonExistentFiles, maxRangeLen, arg));
@@ -1536,9 +1568,9 @@ public final class IgfsImpl implements IgfsEx {
* @param arg Optional task argument.
* @return Execution future.
*/
- <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
- return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
+ return executeAsync0(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg);
}
/**
@@ -1555,7 +1587,7 @@ public final class IgfsImpl implements IgfsEx {
* @return Execution future.
*/
@SuppressWarnings("unchecked")
- <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls,
+ <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends IgfsTask<T, R>> taskCls,
@Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
long maxRangeLen, @Nullable T arg) {
return igfsCtx.kernalContext().task().execute((Class<IgfsTask<T, R>>)taskCls,
@@ -1780,6 +1812,14 @@ public final class IgfsImpl implements IgfsEx {
}
/**
+ * @param fut Internal future.
+ * @return Public API future.
+ */
+ private <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
+ return new IgniteFutureImpl<>(fut);
+ }
+
+ /**
* IGFS thread factory.
*/
@SuppressWarnings("NullableProblems")
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 396e784..5e785e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Abstract interop target.
*/
-public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget {
+public abstract class PlatformAbstractTarget implements PlatformTarget {
/** Constant: TRUE.*/
protected static final int TRUE = 1;
@@ -73,16 +73,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- throw new IgniteCheckedException("Future listening is not supported in " + getClass());
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){
- return null;
- }
-
- /** {@inheritDoc} */
@Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
return throwUnsupported(type);
}
@@ -203,18 +193,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform
}
/**
- * Reads future information and listens.
- *
- * @param reader Reader.
- * @throws IgniteCheckedException In case of error.
- */
- protected long readAndListenFuture(BinaryRawReader reader) throws IgniteCheckedException {
- readAndListenFuture(reader, currentFuture(), null);
-
- return TRUE;
- }
-
- /**
* Wraps a listenable to be returned to platform.
*
* @param listenable Listenable.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
deleted file mode 100644
index a4d35c9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Async target.
- */
-public interface PlatformAsyncTarget {
- /**
- * Gets future for the current operation.
- *
- * @return current future.
- * @throws IgniteCheckedException If failed.
- */
- IgniteInternalFuture currentFuture() throws IgniteCheckedException;
-
- /**
- * Gets a custom future writer.
- *
- * @param opId Operation id.
- * @return A custom writer for given op id.
- */
- @Nullable PlatformFutureUtils.Writer futureWriter(int opId);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
index c2a0797..1ee57cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.platform;
-import org.apache.ignite.IgniteCheckedException;
import org.jetbrains.annotations.Nullable;
/**
@@ -106,27 +105,6 @@ public interface PlatformTargetProxy {
void inStreamAsync(int type, long memPtr) throws Exception;
/**
- * Start listening for the future.
- *
- * @param futId Future ID.
- * @param typ Result type.
- * @throws IgniteCheckedException In case of failure.
- */
- @SuppressWarnings("UnusedDeclaration")
- void listenFuture(final long futId, int typ) throws Exception;
-
- /**
- * Start listening for the future for specific operation type.
- *
- * @param futId Future ID.
- * @param typ Result type.
- * @param opId Operation ID required to pick correct result writer.
- * @throws IgniteCheckedException In case of failure.
- */
- @SuppressWarnings("UnusedDeclaration")
- void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
-
- /**
* Returns the underlying target.
*
* @return Underlying target.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
index 7e0036d..44044b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.processors.platform;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
@@ -37,6 +35,10 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
/** Underlying target. */
private final PlatformTarget target;
+ /**
+ * @param target Platform target.
+ * @param platformCtx Platform context.
+ */
public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) {
assert platformCtx != null;
assert target != null;
@@ -115,15 +117,13 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
- if (res == null) {
+ if (res == null)
throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
- }
IgniteFuture fut = res.future();
- if (fut == null) {
+ if (fut == null)
throw new IgniteException("PlatformAsyncResult.future() should not return null.");
- }
PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
/** {@inheritDoc} */
@@ -211,35 +211,11 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
}
/** {@inheritDoc} */
- @Override public void listenFuture(final long futId, int typ) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, target);
- }
-
- /** {@inheritDoc} */
- @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), target);
- }
-
- /** {@inheritDoc} */
@Override public PlatformTarget unwrap() {
return target;
}
/**
- * @return Future writer.
- */
- private PlatformFutureUtils.Writer futureWriter(int opId) {
- return ((PlatformAsyncTarget)target).futureWriter(opId);
- }
-
- /**
- * @return Current future.
- */
- private IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((PlatformAsyncTarget)target).currentFuture();
- }
-
- /**
* Wraps an object in a proxy when possible.
*
* @param obj Object to wrap.
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 2abcc0d..72f5d62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -32,7 +32,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -56,7 +55,6 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -334,9 +332,6 @@ public class PlatformCache extends PlatformAbstractTarget {
/** Initial JCache (not in binary mode). */
private final IgniteCache rawCache;
- /** Underlying JCache in async mode. */
- private final IgniteCache cacheAsync;
-
/** Whether this cache is created with "keepBinary" flag on the other side. */
private final boolean keepBinary;
@@ -386,8 +381,9 @@ public class PlatformCache extends PlatformAbstractTarget {
assert exts != null;
rawCache = cache;
+
IgniteCache binCache = cache.withKeepBinary();
- cacheAsync = binCache.withAsync();
+
this.cache = (IgniteCacheProxy)binCache;
this.keepBinary = keepBinary;
this.exts = exts;
@@ -448,12 +444,12 @@ public class PlatformCache extends PlatformAbstractTarget {
reader.readObjectDetached()) ? TRUE : FALSE;
case OP_LOC_LOAD_CACHE:
- loadCache0(reader, true, cache);
+ loadCache0(reader, true);
return TRUE;
case OP_LOAD_CACHE:
- loadCache0(reader, false, cache);
+ loadCache0(reader, false);
return TRUE;
@@ -553,66 +549,66 @@ public class PlatformCache extends PlatformAbstractTarget {
});
}
-
case OP_PUT_ASYNC: {
- cacheAsync.put(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.putAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CLEAR_CACHE_ASYNC: {
- cacheAsync.clear();
+ readAndListenFuture(reader, cache.clearAsync());
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CLEAR_ALL_ASYNC: {
- cacheAsync.clearAll(PlatformUtils.readSet(reader));
+ readAndListenFuture(reader, cache.clearAllAsync(PlatformUtils.readSet(reader)));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_REMOVE_ALL2_ASYNC: {
- cacheAsync.removeAll();
+ readAndListenFuture(reader, cache.removeAllAsync());
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_SIZE_ASYNC: {
CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
- cacheAsync.size(modes);
+ readAndListenFuture(reader, cache.sizeAsync(modes));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CLEAR_ASYNC: {
- cacheAsync.clear(reader.readObjectDetached());
+ readAndListenFuture(reader, cache.clearAsync(reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_LOAD_CACHE_ASYNC: {
- loadCache0(reader, false, cacheAsync);
+ readAndListenFuture(reader, loadCacheAsync0(reader, false));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_LOC_LOAD_CACHE_ASYNC: {
- loadCache0(reader, true, cacheAsync);
+ readAndListenFuture(reader, loadCacheAsync0(reader, true));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_PUT_ALL_ASYNC:
- cacheAsync.putAll(PlatformUtils.readMap(reader));
+ readAndListenFuture(reader, cache.putAllAsync(PlatformUtils.readMap(reader)));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REMOVE_ALL_ASYNC:
- cacheAsync.removeAll(PlatformUtils.readSet(reader));
+ readAndListenFuture(reader, cache.removeAllAsync(PlatformUtils.readSet(reader)));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REBALANCE:
readAndListenFuture(reader, cache.rebalance());
@@ -620,79 +616,81 @@ public class PlatformCache extends PlatformAbstractTarget {
return TRUE;
case OP_GET_ASYNC:
- cacheAsync.get(reader.readObjectDetached());
+ readAndListenFuture(reader, cache.getAsync(reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_CONTAINS_KEY_ASYNC:
- cacheAsync.containsKey(reader.readObjectDetached());
+ readAndListenFuture(reader, cache.containsKeyAsync(reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_CONTAINS_KEYS_ASYNC:
- cacheAsync.containsKeys(PlatformUtils.readSet(reader));
+ readAndListenFuture(reader, cache.containsKeysAsync(PlatformUtils.readSet(reader)));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REMOVE_OBJ_ASYNC:
- cacheAsync.remove(reader.readObjectDetached());
+ readAndListenFuture(reader, cache.removeAsync(reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REMOVE_BOOL_ASYNC:
- cacheAsync.remove(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.removeAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_GET_ALL_ASYNC: {
Set keys = PlatformUtils.readSet(reader);
- cacheAsync.getAll(keys);
-
- readAndListenFuture(reader, cacheAsync.future(), WRITER_GET_ALL);
+ readAndListenFuture(reader, cache.getAllAsync(keys), WRITER_GET_ALL);
return TRUE;
}
case OP_GET_AND_PUT_ASYNC:
- cacheAsync.getAndPut(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.getAndPutAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_GET_AND_PUT_IF_ABSENT_ASYNC:
- cacheAsync.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.getAndPutIfAbsentAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_GET_AND_REMOVE_ASYNC:
- cacheAsync.getAndRemove(reader.readObjectDetached());
+ readAndListenFuture(reader, cache.getAndRemoveAsync(reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_GET_AND_REPLACE_ASYNC:
- cacheAsync.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.getAndReplaceAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REPLACE_2_ASYNC:
- cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.replaceAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_REPLACE_3_ASYNC:
- cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached(),
- reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.replaceAsync(reader.readObjectDetached(), reader.readObjectDetached(),
+ reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_INVOKE_ASYNC: {
Object key = reader.readObjectDetached();
CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
- cacheAsync.invoke(key, proc);
-
- readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE);
+ readAndListenFuture(reader, cache.invokeAsync(key, proc), WRITER_INVOKE);
return TRUE;
}
@@ -702,17 +700,16 @@ public class PlatformCache extends PlatformAbstractTarget {
CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
- cacheAsync.invokeAll(keys, proc);
-
- readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE_ALL);
+ readAndListenFuture(reader, cache.invokeAllAsync(keys, proc), WRITER_INVOKE_ALL);
return TRUE;
}
case OP_PUT_IF_ABSENT_ASYNC:
- cacheAsync.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached());
+ readAndListenFuture(reader,
+ cache.putIfAbsentAsync(reader.readObjectDetached(), reader.readObjectDetached()));
- return readAndListenFuture(reader);
+ return TRUE;
case OP_INVOKE: {
Object key = reader.readObjectDetached();
@@ -807,8 +804,45 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Loads cache via localLoadCache or loadCache.
+ *
+ * @param reader Binary reader.
+ * @param loc Local flag.
+ * @return Cache async operation future.
*/
- private void loadCache0(BinaryRawReaderEx reader, boolean loc, IgniteCache cache) {
+ private void loadCache0(BinaryRawReaderEx reader, boolean loc) {
+ PlatformCacheEntryFilter filter = createPlatformCacheEntryFilter(reader);
+
+ Object[] args = readLoadCacheArgs(reader);
+
+ if (loc)
+ cache.localLoadCache(filter, args);
+ else
+ cache.loadCache(filter, args);
+ }
+
+ /**
+ * Asynchronously loads cache via localLoadCacheAsync or loadCacheAsync.
+ *
+ * @param reader Binary reader.
+ * @param loc Local flag.
+ * @return Cache async operation future.
+ */
+ private IgniteFuture<Void> loadCacheAsync0(BinaryRawReaderEx reader, boolean loc) {
+ PlatformCacheEntryFilter filter = createPlatformCacheEntryFilter(reader);
+
+ Object[] args = readLoadCacheArgs(reader);
+
+ if (loc)
+ return cache.localLoadCacheAsync(filter, args);
+ else
+ return cache.loadCacheAsync(filter, args);
+ }
+
+ /**
+ * @param reader Binary reader.
+ * @return created object.
+ */
+ @Nullable private PlatformCacheEntryFilter createPlatformCacheEntryFilter(BinaryRawReaderEx reader) {
PlatformCacheEntryFilter filter = null;
Object pred = reader.readObjectDetached();
@@ -816,6 +850,14 @@ public class PlatformCache extends PlatformAbstractTarget {
if (pred != null)
filter = platformCtx.createCacheEntryFilter(pred, 0);
+ return filter;
+ }
+
+ /**
+ * @param reader Binary reader.
+ * @return Arguments array.
+ */
+ @Nullable private Object[] readLoadCacheArgs(BinaryRawReaderEx reader) {
Object[] args = null;
int argCnt = reader.readInt();
@@ -827,10 +869,7 @@ public class PlatformCache extends PlatformAbstractTarget {
args[i] = reader.readObjectDetached();
}
- if (loc)
- cache.localLoadCache(filter, args);
- else
- cache.loadCache(filter, args);
+ return args;
}
/** {@inheritDoc} */
@@ -1130,25 +1169,6 @@ public class PlatformCache extends PlatformAbstractTarget {
}
}
- /** <inheritDoc /> */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl) cacheAsync.future()).internalFuture();
- }
-
- /** <inheritDoc /> */
- @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
- if (opId == OP_GET_ALL)
- return WRITER_GET_ALL;
-
- if (opId == OP_INVOKE)
- return WRITER_INVOKE;
-
- if (opId == OP_INVOKE_ALL)
- return WRITER_INVOKE_ALL;
-
- return null;
- }
-
/**
* Get lock by id.
*
@@ -1179,6 +1199,10 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Runs specified query.
+ *
+ * @param qry Query.
+ * @return Query cursor.
+ * @throws IgniteCheckedException On error.
*/
private PlatformQueryCursor runQuery(Query qry) throws IgniteCheckedException {
@@ -1195,6 +1219,10 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Runs specified fields query.
+ *
+ * @param qry Query.
+ * @return Query cursor.
+ * @throws IgniteCheckedException On error.
*/
private PlatformFieldsQueryCursor runFieldsQuery(Query qry)
throws IgniteCheckedException {
@@ -1211,6 +1239,10 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads the query of specified type.
+ *
+ * @param reader Binary reader.
+ * @return Query.
+ * @throws IgniteCheckedException On error.
*/
private Query readInitialQuery(BinaryRawReaderEx reader) throws IgniteCheckedException {
int typ = reader.readInt();
@@ -1234,6 +1266,9 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads sql query.
+ *
+ * @param reader Binary reader.
+ * @return Query.
*/
private Query readSqlQuery(BinaryRawReaderEx reader) {
boolean loc = reader.readBoolean();
@@ -1250,6 +1285,9 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads fields query.
+ *
+ * @param reader Binary reader.
+ * @return Query.
*/
private Query readFieldsQuery(BinaryRawReaderEx reader) {
boolean loc = reader.readBoolean();
@@ -1267,6 +1305,9 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads text query.
+ *
+ * @param reader Binary reader.
+ * @return Query.
*/
private Query readTextQuery(BinaryRawReader reader) {
boolean loc = reader.readBoolean();
@@ -1279,6 +1320,9 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads scan query.
+ *
+ * @param reader Binary reader.
+ * @return Query.
*/
private Query readScanQuery(BinaryRawReaderEx reader) {
boolean loc = reader.readBoolean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 9d9a4d2..2b2a78a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -240,9 +240,10 @@ public class PlatformCompute extends PlatformAbstractTarget {
* Execute task.
*
* @param task Task.
+ * @return Target.
*/
private PlatformTarget executeNative0(final PlatformAbstractTask task) {
- IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
+ IgniteInternalFuture fut = computeForPlatform.executeAsync0(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
private static final long serialVersionUID = 0L;
@@ -266,7 +267,9 @@ public class PlatformCompute extends PlatformAbstractTarget {
* Execute task taking arguments from the given reader.
*
* @param reader Reader.
+ * @param async Execute asynchronously flag.
* @return Task result.
+ * @throws IgniteCheckedException On error.
*/
protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) throws IgniteCheckedException {
String taskName = reader.readString();
@@ -277,18 +280,13 @@ public class PlatformCompute extends PlatformAbstractTarget {
IgniteCompute compute0 = computeForTask(nodeIds);
- if (async)
- compute0 = compute0.withAsync();
-
if (!keepBinary && arg instanceof BinaryObjectImpl)
arg = ((BinaryObject)arg).deserialize();
- Object res = compute0.execute(taskName, arg);
-
if (async)
- return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.future()));
+ return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.executeAsync(taskName, arg)));
else
- return toBinary(res);
+ return toBinary(compute0.execute(taskName, arg));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
index d4755de..cb27b19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.entityframework;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
@@ -187,11 +186,10 @@ public class PlatformDotNetEntityFrameworkCacheExtension implements PlatformCach
final ClusterGroup dataNodes = grid.cluster().forDataNodes(dataCacheName);
- IgniteCompute asyncCompute = grid.compute(dataNodes).withAsync();
+ IgniteFuture f = grid.compute(dataNodes).broadcastAsync(
+ new RemoveOldEntriesRunnable(dataCacheName, currentVersions));
- asyncCompute.broadcast(new RemoveOldEntriesRunnable(dataCacheName, currentVersions));
-
- asyncCompute.future().listen(new CleanupCompletionListener(metaCache, dataCacheName));
+ f.listen(new CleanupCompletionListener(metaCache, dataCacheName));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 9ddcc37..845c06a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.platform.events;
+import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -29,8 +29,8 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -91,9 +91,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
private final IgniteEvents events;
/** */
- private final IgniteEvents eventsAsync;
-
- /** */
private final EventResultWriter eventResWriter;
/** */
@@ -111,7 +108,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
assert events != null;
this.events = events;
- eventsAsync = events.withAsync();
eventResWriter = new EventResultWriter(platformCtx);
eventColResWriter = new EventCollectionResultWriter(platformCtx);
@@ -148,16 +144,12 @@ public class PlatformEvents extends PlatformAbstractTarget {
return TRUE;
case OP_REMOTE_QUERY_ASYNC:
- startRemoteQuery(reader, eventsAsync);
-
- readAndListenFuture(reader, currentFuture(), eventColResWriter);
+ readAndListenFuture(reader, startRemoteQueryAsync(reader, events), eventColResWriter);
return TRUE;
case OP_WAIT_FOR_LOCAL_ASYNC: {
- startWaitForLocal(reader, eventsAsync);
-
- readAndListenFuture(reader, currentFuture(), eventResWriter);
+ readAndListenFuture(reader, startWaitForLocalAsync(reader, events), eventResWriter);
return TRUE;
}
@@ -253,6 +245,23 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/**
+ * Starts the waitForLocal asynchronously.
+ *
+ * @param reader Reader
+ * @param events Events.
+ * @return Result.
+ */
+ private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) {
+ Long filterHnd = reader.readObject();
+
+ IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null;
+
+ int[] eventTypes = readEventTypes(reader);
+
+ return events.waitForLocalAsync(filter, eventTypes);
+ }
+
+ /**
* Starts the remote query.
*
* @param reader Reader.
@@ -271,6 +280,25 @@ public class PlatformEvents extends PlatformAbstractTarget {
return events.remoteQuery(filter, timeout);
}
+ /**
+ * Starts the remote query asynchronously.
+ *
+ * @param reader Reader.
+ * @param events Events.
+ * @return Result.
+ */
+ private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx reader, IgniteEvents events) {
+ Object pred = reader.readObjectDetached();
+
+ long timeout = reader.readLong();
+
+ int[] types = readEventTypes(reader);
+
+ PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
+
+ return events.remoteQueryAsync(filter, timeout);
+ }
+
/** {@inheritDoc} */
@Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
@@ -310,24 +338,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
return super.processInLongOutLong(type, val);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
- switch (opId) {
- case OP_WAIT_FOR_LOCAL:
- return eventResWriter;
-
- case OP_REMOTE_QUERY:
- return eventColResWriter;
- }
-
- return null;
- }
-
/**
* Reads event types array.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 6fe109e..8018986 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.platform.messaging;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -27,7 +26,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
import java.util.UUID;
@@ -68,9 +67,6 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/** */
private final IgniteMessaging messaging;
- /** */
- private final IgniteMessaging messagingAsync;
-
/**
* Ctor.
*
@@ -83,7 +79,6 @@ public class PlatformMessaging extends PlatformAbstractTarget {
assert messaging != null;
this.messaging = messaging;
- messagingAsync = messaging.withAsync();
}
/** {@inheritDoc} */
@@ -132,15 +127,15 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
case OP_REMOTE_LISTEN_ASYNC: {
- startRemoteListen(reader, messagingAsync);
+ readAndListenFuture(reader, startRemoteListenAsync(reader, messaging));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_STOP_REMOTE_LISTEN_ASYNC: {
- messagingAsync.stopRemoteListen(reader.readUuid());
+ readAndListenFuture(reader, messaging.stopRemoteListenAsync(reader.readUuid()));
- return readAndListenFuture(reader);
+ return TRUE;
}
default:
@@ -167,6 +162,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/**
* Starts the remote listener.
* @param reader Reader.
+ * @param messaging Messaging.
* @return Listen id.
*/
private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) {
@@ -181,9 +177,22 @@ public class PlatformMessaging extends PlatformAbstractTarget {
return messaging.remoteListen(topic, filter);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
+ /**
+ * Starts the remote listener.
+ * @param reader Reader.
+ * @param messaging Messaging.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx reader, IgniteMessaging messaging) {
+ Object nativeFilter = reader.readObjectDetached();
+
+ long ptr = reader.readLong(); // interop pointer
+
+ Object topic = reader.readObjectDetached();
+
+ PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+ return messaging.remoteListenAsync(topic, filter);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 37727f5..827bc5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.services;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -32,8 +31,8 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.internal.processors.service.GridServiceProxy;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
@@ -46,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.jetbrains.annotations.NotNull;
/**
* Interop services.
@@ -107,9 +107,6 @@ public class PlatformServices extends PlatformAbstractTarget {
/** */
private final IgniteServices services;
- /** */
- private final IgniteServices servicesAsync;
-
/** Server keep binary flag. */
private final boolean srvKeepBinary;
@@ -126,7 +123,6 @@ public class PlatformServices extends PlatformAbstractTarget {
assert services != null;
this.services = services;
- servicesAsync = services.withAsync();
this.srvKeepBinary = srvKeepBinary;
}
@@ -155,21 +151,21 @@ public class PlatformServices extends PlatformAbstractTarget {
}
case OP_DOTNET_DEPLOY_ASYNC: {
- dotnetDeploy(reader, servicesAsync);
+ readAndListenFuture(reader, dotnetDeployAsync(reader, services));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_DOTNET_DEPLOY_MULTIPLE: {
- dotnetDeployMultiple(reader, services);
+ dotnetDeployMultiple(reader);
return TRUE;
}
case OP_DOTNET_DEPLOY_MULTIPLE_ASYNC: {
- dotnetDeployMultiple(reader, servicesAsync);
+ readAndListenFuture(reader, dotnetDeployMultipleAsync(reader));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CANCEL: {
@@ -179,15 +175,15 @@ public class PlatformServices extends PlatformAbstractTarget {
}
case OP_CANCEL_ASYNC: {
- servicesAsync.cancel(reader.readString());
+ readAndListenFuture(reader, services.cancelAsync(reader.readString()));
- return readAndListenFuture(reader);
+ return TRUE;
}
case OP_CANCEL_ALL_ASYNC: {
- servicesAsync.cancelAll();
+ readAndListenFuture(reader, services.cancelAllAsync());
- return readAndListenFuture(reader);
+ return TRUE;
}
default:
@@ -350,15 +346,12 @@ public class PlatformServices extends PlatformAbstractTarget {
return super.processInStreamOutObject(type, reader);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
- return ((IgniteFutureImpl)servicesAsync.future()).internalFuture();
- }
-
/**
* Deploys multiple dotnet services.
+ *
+ * @param reader Binary reader.
*/
- private void dotnetDeployMultiple(BinaryRawReaderEx reader, IgniteServices services) {
+ private void dotnetDeployMultiple(BinaryRawReaderEx reader) {
String name = reader.readString();
Object svc = reader.readObjectDetached();
int totalCnt = reader.readInt();
@@ -369,9 +362,53 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/**
+ * Asynchronously deploys multiple dotnet services.
+ *
+ * @param reader Binary reader.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<Void> dotnetDeployMultipleAsync(BinaryRawReaderEx reader) {
+ String name = reader.readString();
+ Object svc = reader.readObjectDetached();
+ int totalCnt = reader.readInt();
+ int maxPerNodeCnt = reader.readInt();
+
+ return services.deployMultipleAsync(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary),
+ totalCnt, maxPerNodeCnt);
+ }
+
+ /**
* Deploys dotnet service.
+ *
+ * @param reader Binary reader.
+ * @param services Services.
*/
private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) {
+ ServiceConfiguration cfg = dotnetConfiguration(reader);
+
+ services.deploy(cfg);
+ }
+
+ /**
+ * Deploys dotnet service asynchronously.
+ *
+ * @param reader Binary reader.
+ * @param services Services.
+ * @return Future of the operation.
+ */
+ private IgniteFuture<Void> dotnetDeployAsync(BinaryRawReaderEx reader, IgniteServices services) {
+ ServiceConfiguration cfg = dotnetConfiguration(reader);
+
+ return services.deployAsync(cfg);
+ }
+
+ /**
+ * Read the dotnet service configuration.
+ *
+ * @param reader Binary reader,
+ * @return Service configuration.
+ */
+ @NotNull private ServiceConfiguration dotnetConfiguration(BinaryRawReaderEx reader) {
ServiceConfiguration cfg = new ServiceConfiguration();
cfg.setName(reader.readString());
@@ -386,7 +423,7 @@ public class PlatformServices extends PlatformAbstractTarget {
if (filter != null)
cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter));
- services.deploy(cfg);
+ return cfg;
}
/**
@@ -403,8 +440,8 @@ public class PlatformServices extends PlatformAbstractTarget {
/** */
private static final Map<Class<?>, Class<?>> PRIMITIVES_TO_WRAPPERS = new HashMap<>();
- /**
- * Class initializer.
+ /*
+ Class initializer.
*/
static {
PRIMITIVES_TO_WRAPPERS.put(boolean.class, Boolean.class);
@@ -422,6 +459,7 @@ public class PlatformServices extends PlatformAbstractTarget {
*
* @param proxy Proxy object.
* @param clazz Proxy class.
+ * @param ctx Platform context.
*/
private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) {
super(ctx);
@@ -435,18 +473,18 @@ public class PlatformServices extends PlatformAbstractTarget {
/**
* Invokes the proxy.
+ *
* @param mthdName Method name.
* @param srvKeepBinary Binary flag.
* @param args Args.
* @return Invocation result.
- * @throws IgniteCheckedException
- * @throws NoSuchMethodException
+ * @throws IgniteCheckedException On error.
+ * @throws NoSuchMethodException On error.
*/
public Object invoke(String mthdName, boolean srvKeepBinary, Object[] args)
throws IgniteCheckedException, NoSuchMethodException {
- if (proxy instanceof PlatformService) {
+ if (proxy instanceof PlatformService)
return ((PlatformService)proxy).invokeMethod(mthdName, srvKeepBinary, args);
- }
else {
assert proxy instanceof GridServiceProxy;
@@ -467,6 +505,7 @@ public class PlatformServices extends PlatformAbstractTarget {
* @param mthdName Name.
* @param args Args.
* @return Method.
+ * @throws NoSuchMethodException On error.
*/
private static Method getMethod(Class clazz, String mthdName, Object[] args) throws NoSuchMethodException {
assert clazz != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 21f71fa..8f34343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -196,17 +196,16 @@ public class PlatformTransactions extends PlatformAbstractTarget {
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
long txId = reader.readLong();
- final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+ IgniteFuture fut0;
switch (type) {
case OP_COMMIT_ASYNC:
- asyncTx.commit();
+ fut0 = tx(txId).commitAsync();
break;
-
case OP_ROLLBACK_ASYNC:
- asyncTx.rollback();
+ fut0 = tx(txId).rollbackAsync();
break;
@@ -215,7 +214,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
// Future result is the tx itself, we do not want to return it to the platform.
- IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
+ IgniteFuture fut = fut0.chain(new C1<IgniteFuture, Object>() {
private static final long serialVersionUID = 0L;
@Override public Object apply(IgniteFuture fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index ce74f17..7556e7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -112,26 +112,20 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
futs = new IgniteFuture[3];
if (futs[0] == null || futs[1] == null || futs[2] == null) {
- IgniteCache cache = ignite.cache(cacheName).withAsync();
+ IgniteCache cache = ignite.cache(cacheName);
if (futs[0] == null) {
- cache.size(CachePeekMode.PRIMARY);
-
- if (callAsync(cache.<Integer>future(), 0))
+ if (callAsync(cache.sizeAsync(CachePeekMode.PRIMARY), 0))
return null;
}
if (futs[1] == null) {
- cache.clear();
-
- if (callAsync(cache.<Integer>future(), 1))
+ if (callAsync(cache.clearAsync(), 1))
return null;
}
if (futs[2] == null) {
- cache.size(CachePeekMode.PRIMARY);
-
- if (callAsync(cache.<Integer>future(), 2))
+ if (callAsync(cache.sizeAsync(CachePeekMode.PRIMARY), 2))
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index a64ec6d..8f42eb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@ -370,11 +370,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
}
}
- IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync();
-
- comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+ IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids));
- fut = comp.future();
+ fut = comp.executeAsync(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
fut.listen(new CI1<IgniteFuture<Object>>() {
@Override public void apply(IgniteFuture<Object> f) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
index 50a8700..3e31b51 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
@@ -18,25 +18,75 @@
package org.apache.ignite.lang;
/**
- * Allows to enable asynchronous mode on Ignite APIs.
+ * Allows to enable asynchronous mode on Ignite APIs, e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
+ * @deprecated since 2.0. Please use specialized asynchronous methods.
*/
+@Deprecated
public interface IgniteAsyncSupport {
/**
* Gets instance of this component with asynchronous mode enabled.
*
* @return Instance of this component with asynchronous mode enabled.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public IgniteAsyncSupport withAsync();
/**
* @return {@code True} if asynchronous mode is enabled.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public boolean isAsync();
/**
* Gets and resets future for previous asynchronous operation.
*
* @return Future for previous asynchronous operation.
+ *
+ * @deprecated since 2.0. Please use new specialized async method
+ * e.g.
+ * <pre>
+ * IgniteFuture f = cache.getAsync();
+ * </pre>
+ * instead of old-style async API:
+ * <pre>
+ * IgniteCache asyncCache = cache.withAsync();
+ * asyncCache.get(key);
+ * IgniteFuture fut = asyncCache.future();
+ * </pre>
*/
+ @Deprecated
public <R> IgniteFuture<R> future();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
index 1bb7162..2dfea51 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupported.java
@@ -31,11 +31,13 @@ import java.lang.annotation.Target;
*
* TODO coding example.
*
+ * @deprecated since 2.0. Please use specialized asynchronous methods.
* @see IgniteAsyncSupport
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
+@Deprecated
public @interface IgniteAsyncSupported {
-
+ // No-op.
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index e2e7100..57a2b00 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -237,6 +238,19 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
public void commit() throws IgniteException;
/**
+ * Asynchronously commits this transaction by initiating {@code two-phase-commit} process.
+ *
+ * @return a Future representing pending completion of the commit.
+ * @throws IgniteException If commit failed.
+ * @throws TransactionTimeoutException If transaction is timed out.
+ * @throws TransactionRollbackException If transaction is automatically rolled back.
+ * @throws TransactionOptimisticException If transaction concurrency is {@link TransactionConcurrency#OPTIMISTIC}
+ * and commit is optimistically failed.
+ * @throws TransactionHeuristicException If transaction has entered an unknown state.
+ */
+ public IgniteFuture<Void> commitAsync() throws IgniteException;
+
+ /**
* Ends the transaction. Transaction will be rolled back if it has not been committed.
*
* @throws IgniteException If transaction could not be gracefully ended.
@@ -250,4 +264,12 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
*/
@IgniteAsyncSupported
public void rollback() throws IgniteException;
+
+ /**
+ * Asynchronously rolls back this transaction.
+ *
+ * @return a Future representing pending completion of the rollback.
+ * @throws IgniteException If rollback failed.
+ */
+ public IgniteFuture<Void> rollbackAsync() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
index 4d94400..722e37f 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/IgniteCacheExpiryStoreLoadSelfTest.java
@@ -102,13 +102,8 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends GridCacheAbstractSelfTes
keys.add(primaryKey(jcache(1)));
keys.add(primaryKey(jcache(2)));
- if (async) {
- IgniteCache<String, Integer> asyncCache = cache.withAsync();
-
- asyncCache.loadCache(null, keys.toArray(new Integer[3]));
-
- asyncCache.future().get();
- }
+ if (async)
+ cache.loadCacheAsync(null, keys.toArray(new Integer[3])).get();
else
cache.loadCache(null, keys.toArray(new Integer[3]));
@@ -143,13 +138,8 @@ public class IgniteCacheExpiryStoreLoadSelfTest extends GridCacheAbstractSelfTes
List<Integer> keys = primaryKeys(cache, 3);
- if (async) {
- IgniteCache<String, Integer> asyncCache = cache.withAsync();
-
- asyncCache.localLoadCache(null, keys.toArray(new Integer[3]));
-
- asyncCache.future().get();
- }
+ if (async)
+ cache.localLoadCacheAsync(null, keys.toArray(new Integer[3])).get();
else
cache.localLoadCache(null, keys.toArray(new Integer[3]));
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java
index 6fdaeb0..fbf938d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
@@ -356,11 +355,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If failed.
*/
private void run1(AtomicInteger cnt) throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.broadcast(runJob);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Void> fut = compute(prj).broadcastAsync(runJob);
waitForExecution(fut);
@@ -378,11 +373,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
private void run2(AtomicInteger cnt) throws Exception {
Collection<IgniteRunnable> jobs = F.asList(runJob);
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.run(jobs);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Void> fut = compute(prj).runAsync(jobs);
waitForExecution(fut);
@@ -398,11 +389,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If failed.
*/
private void call1(AtomicInteger cnt) throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.broadcast(calJob);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(calJob);
waitForExecution(fut);
@@ -418,13 +405,9 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If failed.
*/
private void call2(AtomicInteger cnt) throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
Collection<IgniteCallable<String>> jobs = F.asList(calJob);
- comp.call(jobs);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Collection<String>> fut = compute(prj).callAsync(jobs);
waitForExecution(fut);
@@ -440,11 +423,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If failed.
*/
private void call3(AtomicInteger cnt) throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.apply(clrJob, (String) null);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<String> fut = compute(prj).applyAsync(clrJob, (String) null);
waitForExecution(fut);
@@ -462,11 +441,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
private void call4(AtomicInteger cnt) throws Exception {
Collection<String> args = F.asList("a", "b", "c");
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.apply(clrJob, args);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Collection<String>> fut = compute(prj).applyAsync(clrJob, args);
waitForExecution(fut);
@@ -482,11 +457,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If failed.
*/
private void call5(AtomicInteger cnt) throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.broadcast(new TestClosure(), "arg");
-
- ComputeTaskFuture<Collection<String>> fut = comp.future();
+ IgniteFuture<Collection<String>> fut = compute(prj).broadcastAsync(new TestClosure(), "arg");
waitForExecution(fut);
@@ -509,11 +480,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
private void forkjoin1(AtomicInteger cnt) throws Exception {
Collection<String> args = F.asList("a", "b", "c");
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.apply(clrJob, args, rdc);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture fut = compute(prj).applyAsync(clrJob, args, rdc);
waitForExecution(fut);
@@ -531,11 +498,7 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
private void forkjoin2(AtomicInteger cnt) throws Exception {
Collection<IgniteCallable<String>> jobs = F.asList(calJob);
- IgniteCompute comp = compute(prj).withAsync();
-
- comp.call(jobs, rdc);
-
- ComputeTaskFuture fut = comp.future();
+ IgniteFuture<Object> fut = compute(prj).callAsync(jobs, rdc);
waitForExecution(fut);
@@ -676,26 +639,22 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
* @throws Exception If test failed.
*/
private void checkActiveFutures() throws Exception {
- IgniteCompute comp = compute(prj).withAsync();
-
- assertEquals(0, comp.activeTaskFutures().size());
+ assertEquals(0, compute(prj).activeTaskFutures().size());
cnt.set(0);
- Collection<ComputeTaskFuture<Object>> futsList = new ArrayList<>();
+ Collection<IgniteFuture<Object>> futsList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- comp.call(new TestWaitCallable<>());
-
- ComputeTaskFuture<Object> fut = comp.future();
+ IgniteFuture<Object> fut = compute(prj).callAsync(new TestWaitCallable<>());
assertFalse(fut.isDone());
- Map<IgniteUuid, ComputeTaskFuture<Object>> futs = comp.activeTaskFutures();
+ Map<IgniteUuid, ComputeTaskFuture<Object>> futs = compute(prj).activeTaskFutures();
assertEquals(i + 1, futs.size());
- assertTrue(futs.containsKey(fut.getTaskSession().getId()));
+ assertTrue(futs.containsKey(((ComputeTaskFuture)fut).getTaskSession().getId()));
futsList.add(fut);
}
@@ -706,10 +665,10 @@ public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest im
mux.notifyAll();
}
- for (ComputeTaskFuture<Object> fut : futsList)
+ for (IgniteFuture<Object> fut : futsList)
fut.get();
- assertEquals(0, comp.activeTaskFutures().size());
+ assertEquals(0, compute(prj).activeTaskFutures().size());
}
/**