You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/04 08:28:27 UTC
[41/50] [abbrv] ignite git commit: IGNITE-2228: .NET: Compute futures
could be cancelled.
IGNITE-2228: .NET: Compute futures could be cancelled.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24a78f5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24a78f5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24a78f5d
Branch: refs/heads/ignite-2218
Commit: 24a78f5de27908f608cf53915064164d25c0f633
Parents: e661f17
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Dec 30 13:51:32 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Dec 30 13:51:32 2015 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 17 +-
.../processors/platform/PlatformTarget.java | 22 ++
.../platform/compute/PlatformCompute.java | 34 ++-
.../platform/utils/PlatformFutureUtils.java | 119 +++++++---
.../platform/utils/PlatformListenable.java | 47 ++++
.../cpp/common/include/ignite/common/exports.h | 7 +-
.../cpp/common/include/ignite/common/java.h | 13 +-
.../platforms/cpp/common/project/vs/module.def | 6 +-
modules/platforms/cpp/common/src/exports.cpp | 20 +-
modules/platforms/cpp/common/src/java.cpp | 70 +++++-
.../Compute/ComputeApiTest.cs | 20 ++
.../Apache.Ignite.Core.csproj | 2 +
.../Common/IgniteFutureCancelledException.cs | 65 ++++++
.../Apache.Ignite.Core/Compute/ICompute.cs | 233 +++++++++++++++++++
.../Impl/Common/CancelledTask.cs | 47 ++++
.../Apache.Ignite.Core/Impl/Common/Future.cs | 74 +++++-
.../Apache.Ignite.Core/Impl/Compute/Compute.cs | 157 ++++++++++++-
.../Impl/Compute/ComputeImpl.cs | 15 +-
.../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 4 +
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 52 +++++
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 16 +-
.../Impl/Unmanaged/UnmanagedUtils.cs | 31 ++-
22 files changed, 995 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 34a2cca..7ffceef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.*;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
@@ -184,12 +184,23 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
/** {@inheritDoc} */
@Override public void listenFuture(final long futId, int typ) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this);
+ listenFutureAndGet(futId, typ);
}
/** {@inheritDoc} */
@Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this);
+ listenFutureForOperationAndGet(futId, typ, opId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception {
+ return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId)
+ throws Exception {
+ return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index bf657d1..1ebf700 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.platform.utils.*;
import org.jetbrains.annotations.Nullable;
/**
@@ -113,4 +114,25 @@ public interface PlatformTarget {
*/
@SuppressWarnings("UnusedDeclaration")
public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
+
+ /**
+ * Start listening for the future.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception;
+
+ /**
+ * Start listening for the future for specific operation type.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @param opId Operation ID required to pick correct result writer.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 9ef6b5e..1dad126 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.utils.*;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
@@ -75,36 +76,31 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
switch (type) {
case OP_UNICAST:
- processClosures(reader.readLong(), reader, false, false);
-
- return TRUE;
+ return processClosures(reader.readLong(), reader, false, false);
case OP_BROADCAST:
- processClosures(reader.readLong(), reader, true, false);
-
- return TRUE;
+ return processClosures(reader.readLong(), reader, true, false);
case OP_AFFINITY:
- processClosures(reader.readLong(), reader, false, true);
-
- return TRUE;
+ return processClosures(reader.readLong(), reader, false, true);
default:
- return super.processInStreamOutLong(type, reader);
+ return super.processInStreamOutObject(type, reader);
}
}
/**
* Process closure execution request.
- *
- * @param taskPtr Task pointer.
+ * @param taskPtr Task pointer.
* @param reader Reader.
* @param broadcast broadcast flag.
*/
- private void processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, boolean affinity) {
+ private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast,
+ boolean affinity) {
PlatformAbstractTask task;
int size = reader.readInt();
@@ -155,7 +151,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
- executeNative0(task);
+ return executeNative0(task);
}
/**
@@ -194,10 +190,10 @@ public class PlatformCompute extends PlatformAbstractTarget {
* @param taskPtr Pointer to the task.
* @param topVer Topology version.
*/
- public void executeNative(long taskPtr, long topVer) {
+ public PlatformListenable executeNative(long taskPtr, long topVer) {
final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
- executeNative0(task);
+ return executeNative0(task);
}
/**
@@ -231,7 +227,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
*
* @param task Task.
*/
- private void executeNative0(final PlatformAbstractTask task) {
+ private PlatformListenable executeNative0(final PlatformAbstractTask task) {
IgniteInternalFuture fut = compute.executeAsync(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@@ -248,6 +244,8 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
}
});
+
+ return PlatformFutureUtils.getListenable(fut);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index e6f28c9..7a86201 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.utils;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -67,10 +68,15 @@ public class PlatformFutureUtils {
* @param fut Java future.
* @param futPtr Native future pointer.
* @param typ Expected return type.
+ * @return Resulting listenable.
*/
- public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
- PlatformAbstractTarget target) {
- listen(ctx, new InternalFutureListenable(fut), futPtr, typ, null, target);
+ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
+ final int typ, PlatformAbstractTarget target) {
+ PlatformListenable listenable = getListenable(fut);
+
+ listen(ctx, listenable, futPtr, typ, null, target);
+
+ return listenable;
}
/**
* Listen future.
@@ -79,10 +85,15 @@ public class PlatformFutureUtils {
* @param fut Java future.
* @param futPtr Native future pointer.
* @param typ Expected return type.
+ * @return Resulting listenable.
*/
- public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
- PlatformAbstractTarget target) {
- listen(ctx, new FutureListenable(fut), futPtr, typ, null, target);
+ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
+ final int typ, PlatformAbstractTarget target) {
+ PlatformListenable listenable = getListenable(fut);
+
+ listen(ctx, listenable, futPtr, typ, null, target);
+
+ return listenable;
}
/**
@@ -93,10 +104,15 @@ public class PlatformFutureUtils {
* @param futPtr Native future pointer.
* @param typ Expected return type.
* @param writer Writer.
+ * @return Resulting listenable.
*/
- public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
- Writer writer, PlatformAbstractTarget target) {
- listen(ctx, new InternalFutureListenable(fut), futPtr, typ, writer, target);
+ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
+ final int typ, Writer writer, PlatformAbstractTarget target) {
+ PlatformListenable listenable = getListenable(fut);
+
+ listen(ctx, listenable, futPtr, typ, writer, target);
+
+ return listenable;
}
/**
@@ -107,10 +123,15 @@ public class PlatformFutureUtils {
* @param futPtr Native future pointer.
* @param typ Expected return type.
* @param writer Writer.
+ * @return Resulting listenable.
*/
- public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
- Writer writer, PlatformAbstractTarget target) {
- listen(ctx, new FutureListenable(fut), futPtr, typ, writer, target);
+ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
+ final int typ, Writer writer, PlatformAbstractTarget target) {
+ PlatformListenable listenable = getListenable(fut);
+
+ listen(ctx, listenable, futPtr, typ, writer, target);
+
+ return listenable;
}
/**
@@ -120,10 +141,35 @@ public class PlatformFutureUtils {
* @param fut Java future.
* @param futPtr Native future pointer.
* @param writer Writer.
+ * @return Resulting listenable.
+ */
+ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
+ Writer writer, PlatformAbstractTarget target) {
+ PlatformListenable listenable = getListenable(fut);
+
+ listen(ctx, listenable, futPtr, TYP_OBJ, writer, target);
+
+ return listenable;
+ }
+
+ /**
+ * Gets the listenable.
+ *
+ * @param fut Future.
+ * @return Platform listenable.
*/
- public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, Writer writer,
- PlatformAbstractTarget target) {
- listen(ctx, new InternalFutureListenable(fut), futPtr, TYP_OBJ, writer, target);
+ public static PlatformListenable getListenable(IgniteInternalFuture fut) {
+ return new InternalFutureListenable(fut);
+ }
+
+ /**
+ * Gets the listenable.
+ *
+ * @param fut Future.
+ * @return Platform listenable.
+ */
+ public static PlatformListenable getListenable(IgniteFuture fut) {
+ return new FutureListenable(fut);
}
/**
@@ -136,8 +182,8 @@ public class PlatformFutureUtils {
* @param writer Optional writer.
*/
@SuppressWarnings("unchecked")
- private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ,
- @Nullable final Writer writer, final PlatformAbstractTarget target) {
+ private static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
+ int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) {
final PlatformCallbackGateway gate = ctx.gateway();
listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
@@ -312,21 +358,9 @@ public class PlatformFutureUtils {
}
/**
- * Listenable entry.
- */
- private static interface Listenable {
- /**
- * Listen.
- *
- * @param lsnr Listener.
- */
- public void listen(IgniteBiInClosure<Object, Throwable> lsnr);
- }
-
- /**
* Listenable around Ignite future.
*/
- private static class FutureListenable implements Listenable {
+ private static class FutureListenable implements PlatformListenable {
/** Future. */
private final IgniteFuture fut;
@@ -358,12 +392,22 @@ public class PlatformFutureUtils {
}
});
}
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ return fut.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return fut.isCancelled();
+ }
}
/**
* Listenable around Ignite future.
*/
- private static class InternalFutureListenable implements Listenable {
+ private static class InternalFutureListenable implements PlatformListenable {
/** Future. */
private final IgniteInternalFuture fut;
@@ -392,6 +436,15 @@ public class PlatformFutureUtils {
}
});
}
- }
-}
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return fut.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return fut.isCancelled();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java
new file mode 100644
index 0000000..223590d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+
+/**
+ * Platform listenable.
+ */
+public interface PlatformListenable {
+ /**
+ * Listen.
+ *
+ * @param lsnr Listener.
+ */
+ public void listen(IgniteBiInClosure<Object, Throwable> lsnr);
+
+ /**
+ * Cancel this instance.
+ *
+ * @return True if canceled.
+ */
+ public boolean cancel() throws IgniteCheckedException;
+
+ /**
+ * Returns true if this listenable was canceled before completion.
+ *
+ * @return True if this listenable was canceled before completion.
+ */
+ public boolean isCancelled();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/cpp/common/include/ignite/common/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/exports.h b/modules/platforms/cpp/common/include/ignite/common/exports.h
index 23b9665..3eb775d 100644
--- a/modules/platforms/cpp/common/include/ignite/common/exports.h
+++ b/modules/platforms/cpp/common/include/ignite/common/exports.h
@@ -55,6 +55,8 @@ extern "C" {
void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ);
void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId);
+ void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ);
+ void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId);
int IGNITE_CALL IgniteAffinityPartitions(gcj::JniContext* ctx, void* obj);
@@ -80,7 +82,7 @@ extern "C" {
void IGNITE_CALL IgniteComputeWithNoFailover(gcj::JniContext* ctx, void* obj);
void IGNITE_CALL IgniteComputeWithTimeout(gcj::JniContext* ctx, void* obj, long long timeout);
- void IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer);
+ void* IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer);
void IGNITE_CALL IgniteContinuousQueryClose(gcj::JniContext* ctx, void* obj);
void* IGNITE_CALL IgniteContinuousQueryGetInitialQueryCursor(gcj::JniContext* ctx, void* obj);
@@ -153,6 +155,9 @@ extern "C" {
long long IGNITE_CALL IgniteAtomicLongCompareAndSetAndGet(gcj::JniContext* ctx, void* obj, long long expVal, long long newVal);
bool IGNITE_CALL IgniteAtomicLongIsClosed(gcj::JniContext* ctx, void* obj);
void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj);
+
+ bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj);
+ bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj);
}
#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/cpp/common/include/ignite/common/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h
index 7a2165c..e629c77 100644
--- a/modules/platforms/cpp/common/include/ignite/common/java.h
+++ b/modules/platforms/cpp/common/include/ignite/common/java.h
@@ -318,6 +318,8 @@ namespace ignite
jmethodID m_PlatformTarget_inObjectStreamOutStream;
jmethodID m_PlatformTarget_listenFuture;
jmethodID m_PlatformTarget_listenFutureForOperation;
+ jmethodID m_PlatformTarget_listenFutureAndGet;
+ jmethodID m_PlatformTarget_listenFutureForOperationAndGet;
jclass c_PlatformTransactions;
jmethodID m_PlatformTransactions_txStart;
@@ -347,6 +349,10 @@ namespace ignite
jmethodID m_PlatformAtomicLong_isClosed;
jmethodID m_PlatformAtomicLong_close;
+ jclass c_PlatformListenable;
+ jmethodID m_PlatformListenable_cancel;
+ jmethodID m_PlatformListenable_isCancelled;
+
/**
* Constructor.
*/
@@ -501,6 +507,8 @@ namespace ignite
jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL);
void TargetListenFuture(jobject obj, long long futId, int typ);
void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId);
+ void* TargetListenFutureAndGet(jobject obj, long long futId, int typ);
+ void* TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId);
int AffinityPartitions(jobject obj);
@@ -526,7 +534,7 @@ namespace ignite
void ComputeWithNoFailover(jobject obj);
void ComputeWithTimeout(jobject obj, long long timeout);
- void ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer);
+ void* ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer);
void ContinuousQueryClose(jobject obj);
jobject ContinuousQueryGetInitialQueryCursor(jobject obj);
@@ -589,6 +597,9 @@ namespace ignite
bool AtomicLongIsClosed(jobject obj);
void AtomicLongClose(jobject obj);
+ bool ListenableCancel(jobject obj);
+ bool ListenableIsCancelled(jobject obj);
+
jobject Acquire(jobject obj);
void DestroyJvm();
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/cpp/common/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/project/vs/module.def b/modules/platforms/cpp/common/project/vs/module.def
index 99cec2d..3d166bd 100644
--- a/modules/platforms/cpp/common/project/vs/module.def
+++ b/modules/platforms/cpp/common/project/vs/module.def
@@ -108,4 +108,8 @@ IgniteAtomicLongGetAndDecrement @105
IgniteAtomicLongGetAndSet @106
IgniteAtomicLongCompareAndSetAndGet @107
IgniteAtomicLongIsClosed @108
-IgniteAtomicLongClose @109
\ No newline at end of file
+IgniteAtomicLongClose @109
+IgniteListenableCancel @110
+IgniteListenableIsCancelled @111
+IgniteTargetListenFutureAndGet @112
+IgniteTargetListenFutureForOperationAndGet @113
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/cpp/common/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/src/exports.cpp b/modules/platforms/cpp/common/src/exports.cpp
index 327719e..08425a4 100644
--- a/modules/platforms/cpp/common/src/exports.cpp
+++ b/modules/platforms/cpp/common/src/exports.cpp
@@ -138,6 +138,14 @@ extern "C" {
ctx->TargetListenFutureForOperation(static_cast<jobject>(obj), futId, typ, opId);
}
+ void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ) {
+ return ctx->TargetListenFutureAndGet(static_cast<jobject>(obj), futId, typ);
+ }
+
+ void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId) {
+ return ctx->TargetListenFutureForOperationAndGet(static_cast<jobject>(obj), futId, typ, opId);
+ }
+
int IGNITE_CALL IgniteAffinityPartitions(gcj::JniContext* ctx, void* obj) {
return ctx->AffinityPartitions(static_cast<jobject>(obj));
}
@@ -219,8 +227,8 @@ extern "C" {
ctx->ComputeWithTimeout(static_cast<jobject>(obj), timeout);
}
- void IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer) {
- ctx->ComputeExecuteNative(static_cast<jobject>(obj), taskPtr, topVer);
+ void* IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer) {
+ return ctx->ComputeExecuteNative(static_cast<jobject>(obj), taskPtr, topVer);
}
void IGNITE_CALL IgniteContinuousQueryClose(gcj::JniContext* ctx, void* obj) {
@@ -458,4 +466,12 @@ extern "C" {
void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj) {
return ctx->AtomicLongClose(static_cast<jobject>(obj));
}
+
+ bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) {
+ return ctx->ListenableCancel(static_cast<jobject>(obj));
+ }
+
+ bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) {
+ return ctx->ListenableIsCancelled(static_cast<jobject>(obj));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/cpp/common/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp
index 64f5d9c..63deba5 100644
--- a/modules/platforms/cpp/common/src/java.cpp
+++ b/modules/platforms/cpp/common/src/java.cpp
@@ -211,6 +211,8 @@ namespace ignite
JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false);
JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false);
JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false);
+ JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET = JniMethod("listenFutureAndGet", "(JI)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false);
+ JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET = JniMethod("listenFutureForOperationAndGet", "(JII)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false);
const char* C_PLATFORM_CLUSTER_GRP = "org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup";
JniMethod M_PLATFORM_CLUSTER_GRP_FOR_OTHERS = JniMethod("forOthers", "(Lorg/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup;)Lorg/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup;", false);
@@ -227,7 +229,7 @@ namespace ignite
const char* C_PLATFORM_COMPUTE = "org/apache/ignite/internal/processors/platform/compute/PlatformCompute";
JniMethod M_PLATFORM_COMPUTE_WITH_NO_FAILOVER = JniMethod("withNoFailover", "()V", false);
JniMethod M_PLATFORM_COMPUTE_WITH_TIMEOUT = JniMethod("withTimeout", "(J)V", false);
- JniMethod M_PLATFORM_COMPUTE_EXECUTE_NATIVE = JniMethod("executeNative", "(JJ)V", false);
+ JniMethod M_PLATFORM_COMPUTE_EXECUTE_NATIVE = JniMethod("executeNative", "(JJ)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false);
const char* C_PLATFORM_CACHE = "org/apache/ignite/internal/processors/platform/cache/PlatformCache";
JniMethod M_PLATFORM_CACHE_WITH_SKIP_STORE = JniMethod("withSkipStore", "()Lorg/apache/ignite/internal/processors/platform/cache/PlatformCache;", false);
@@ -390,6 +392,10 @@ namespace ignite
JniMethod M_PLATFORM_ATOMIC_LONG_IS_CLOSED = JniMethod("isClosed", "()Z", false);
JniMethod M_PLATFORM_ATOMIC_LONG_CLOSE = JniMethod("close", "()V", false);
+ const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable";
+ JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false);
+ JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false);
+
/* STATIC STATE. */
gcc::CriticalSection JVM_LOCK;
JniJvm JVM;
@@ -650,6 +656,8 @@ namespace ignite
m_PlatformTarget_inObjectStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_STREAM);
m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE);
m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION);
+ m_PlatformTarget_listenFutureAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET);
+ m_PlatformTarget_listenFutureForOperationAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET);
c_PlatformTransactions = FindClass(env, C_PLATFORM_TRANSACTIONS);
m_PlatformTransactions_txStart = FindMethod(env, c_PlatformTransactions, M_PLATFORM_TRANSACTIONS_TX_START);
@@ -666,7 +674,7 @@ namespace ignite
m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA);
- jclass c_PlatformAtomicLong = FindClass(env, C_PLATFORM_ATOMIC_LONG);
+ c_PlatformAtomicLong = FindClass(env, C_PLATFORM_ATOMIC_LONG);
m_PlatformAtomicLong_get = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_GET);
m_PlatformAtomicLong_incrementAndGet = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_INCREMENT_AND_GET);
m_PlatformAtomicLong_getAndIncrement = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_GET_AND_INCREMENT);
@@ -679,6 +687,10 @@ namespace ignite
m_PlatformAtomicLong_isClosed = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_IS_CLOSED);
m_PlatformAtomicLong_close = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_CLOSE);
+ c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE);
+ m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL);
+ m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED);
+
// Find utility classes which are not used from context, but are still required in other places.
CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION);
}
@@ -1322,6 +1334,28 @@ namespace ignite
ExceptionCheck(env);
}
+ void* JniContext::TargetListenFutureAndGet(jobject obj, long long futId, int typ) {
+ JNIEnv* env = Attach();
+
+ jobject res = env->CallObjectMethod(obj,
+ jvm->GetMembers().m_PlatformTarget_listenFutureAndGet, futId, typ);
+
+ ExceptionCheck(env);
+
+ return LocalToGlobal(env, res);
+ }
+
+ void* JniContext::TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId) {
+ JNIEnv* env = Attach();
+
+ jobject res = env->CallObjectMethod(obj,
+ jvm->GetMembers().m_PlatformTarget_listenFutureForOperationAndGet, futId, typ, opId);
+
+ ExceptionCheck(env);
+
+ return LocalToGlobal(env, res);
+ }
+
int JniContext::AffinityPartitions(jobject obj) {
JNIEnv* env = Attach();
@@ -1517,12 +1551,15 @@ namespace ignite
ExceptionCheck(env);
}
- void JniContext::ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer) {
+ void* JniContext::ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer) {
JNIEnv* env = Attach();
- env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformCompute_executeNative, taskPtr, topVer);
+ jobject res = env->CallObjectMethod(obj,
+ jvm->GetMembers().m_PlatformCompute_executeNative, taskPtr, topVer);
ExceptionCheck(env);
+
+ return LocalToGlobal(env, res);
}
void JniContext::ContinuousQueryClose(jobject obj) {
@@ -1536,7 +1573,8 @@ namespace ignite
jobject JniContext::ContinuousQueryGetInitialQueryCursor(jobject obj) {
JNIEnv* env = Attach();
- jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformContinuousQuery_getInitialQueryCursor);
+ jobject res = env->CallObjectMethod(obj,
+ jvm->GetMembers().m_PlatformContinuousQuery_getInitialQueryCursor);
ExceptionCheck(env);
@@ -2037,6 +2075,28 @@ namespace ignite
ExceptionCheck(env);
}
+ bool JniContext::ListenableCancel(jobject obj)
+ {
+ JNIEnv* env = Attach();
+
+ jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel);
+
+ ExceptionCheck(env);
+
+ return res != 0;;
+ }
+
+ bool JniContext::ListenableIsCancelled(jobject obj)
+ {
+ JNIEnv* env = Attach();
+
+ jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled);
+
+ ExceptionCheck(env);
+
+ return res != 0;;
+ }
+
jobject JniContext::Acquire(jobject obj)
{
if (obj) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index 87b7f9d..fe7d78f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -1005,6 +1005,25 @@ namespace Apache.Ignite.Core.Tests.Compute
}
/// <summary>
+ /// Tests single action run.
+ /// </summary>
+ [Test]
+ public void TestRunActionAsyncCancel()
+ {
+ using (var cts = new CancellationTokenSource())
+ {
+ // Cancel while executing
+ var task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
+ cts.Cancel();
+ Assert.IsTrue(task.IsCanceled);
+
+ // Use cancelled token
+ task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
+ Assert.IsTrue(task.IsCanceled);
+ }
+ }
+
+ /// <summary>
/// Tests multiple actions run.
/// </summary>
[Test]
@@ -1274,6 +1293,7 @@ namespace Apache.Ignite.Core.Tests.Compute
public void Invoke()
{
+ Thread.Sleep(10);
Interlocked.Increment(ref InvokeCount);
LastNodeId = _grid.GetCluster().GetLocalNode().Id;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index f758863..12404be 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -104,6 +104,7 @@
<Compile Include="Cluster\IClusterNodeFilter.cs" />
<Compile Include="Cluster\Package-Info.cs" />
<Compile Include="Common\IgniteException.cs" />
+ <Compile Include="Common\IgniteFutureCancelledException.cs" />
<Compile Include="Common\IgniteGuid.cs" />
<Compile Include="Common\Package-Info.cs" />
<Compile Include="Compute\ComputeExecutionRejectedException.cs" />
@@ -187,6 +188,7 @@
<Compile Include="Impl\Collections\MultiValueDictionary.cs" />
<Compile Include="Impl\Collections\ReadOnlyCollection.cs" />
<Compile Include="Impl\Collections\ReadOnlyDictionary.cs" />
+ <Compile Include="Impl\Common\CancelledTask.cs" />
<Compile Include="Impl\Common\Classpath.cs" />
<Compile Include="Impl\Common\CopyOnWriteConcurrentDictionary.cs" />
<Compile Include="Impl\Common\DelegateConverter.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs
new file mode 100644
index 0000000..02433ce
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Common
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ /// <summary>
+ /// Indicates future cancellation within Ignite.
+ /// </summary>
+ public class IgniteFutureCancelledException : IgniteException
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="IgniteFutureCancelledException"/> class.
+ /// </summary>
+ public IgniteFutureCancelledException()
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="IgniteFutureCancelledException"/> class.
+ /// </summary>
+ /// <param name="message">The message that describes the error.</param>
+ public IgniteFutureCancelledException(string message) : base(message)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="IgniteFutureCancelledException"/> class.
+ /// </summary>
+ /// <param name="message">The message.</param>
+ /// <param name="cause">The cause.</param>
+ public IgniteFutureCancelledException(string message, Exception cause) : base(message, cause)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="IgniteFutureCancelledException"/> class.
+ /// </summary>
+ /// <param name="info">Serialization information.</param>
+ /// <param name="ctx">Streaming context.</param>
+ protected IgniteFutureCancelledException(SerializationInfo info, StreamingContext ctx) : base(info, ctx)
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
index d818153..a677f39 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Compute
{
using System;
using System.Collections.Generic;
+ using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cluster;
@@ -98,6 +99,19 @@ namespace Apache.Ignite.Core.Compute
Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg);
/// <summary>
+ /// Executes given Java task on the grid projection. If task for given name has not been deployed yet,
+ /// then 'taskName' will be used as task class name to auto-deploy the task.
+ /// </summary>
+ /// <typeparam name="TRes">Type of task result.</typeparam>
+ /// <param name="taskName">Java task name</param>
+ /// <param name="taskArg">Optional argument of task execution, can be null.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task result.
+ /// </returns>
+ Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
/// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
/// </summary>
@@ -123,6 +137,22 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}" /> documentation.
+ /// </summary>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of final task result.</typeparam>
+ /// <param name="task">Task to execute.</param>
+ /// <param name="taskArg">Optional task argument.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task result.
+ /// </returns>
+ Task<TRes> ExecuteAsync<TArg, TJobRes, TRes>(IComputeTask<TArg, TJobRes, TRes> task, TArg taskArg,
+ CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
/// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
/// </summary>
/// <param name="task">Task to execute.</param>
@@ -143,6 +173,19 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}" /> documentation.
+ /// </summary>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
+ /// <param name="task">Task to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task result.
+ /// </returns>
+ Task<TRes> ExecuteAsync<TJobRes, TRes>(IComputeTask<TJobRes, TRes> task, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
/// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
/// </summary>
/// <param name="taskType">Task type.</param>
@@ -167,6 +210,21 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}" /> documentation.
+ /// </summary>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
+ /// <param name="taskType">Task type.</param>
+ /// <param name="taskArg">Optional task argument.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task result.
+ /// </returns>
+ Task<TRes> ExecuteAsync<TArg, TJobRes, TRes>(Type taskType, TArg taskArg, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
/// refer to <see cref="IComputeTask{A,T,R}"/> documentation.
/// </summary>
/// <param name="taskType">Task type.</param>
@@ -186,6 +244,19 @@ namespace Apache.Ignite.Core.Compute
Task<TRes> ExecuteAsync<TJobRes, TRes>(Type taskType);
/// <summary>
+ /// Executes given task on the grid projection. For step-by-step explanation of task execution process
+ /// refer to <see cref="IComputeTask{A,T,R}" /> documentation.
+ /// </summary>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
+ /// <param name="taskType">Task type.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task result.
+ /// </returns>
+ Task<TRes> ExecuteAsync<TJobRes, TRes>(Type taskType, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes provided job on a node in this grid projection. The result of the
/// job execution is returned from the result closure.
/// </summary>
@@ -204,6 +275,18 @@ namespace Apache.Ignite.Core.Compute
Task<TRes> CallAsync<TRes>(IComputeFunc<TRes> clo);
/// <summary>
+ /// Executes provided job on a node in this grid projection. The result of the
+ /// job execution is returned from the result closure.
+ /// </summary>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="clo">Job to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Job result for this execution.
+ /// </returns>
+ Task<TRes> CallAsync<TRes>(IComputeFunc<TRes> clo, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes given job on the node where data for provided affinity key is located
/// (a.k.a. affinity co-location).
/// </summary>
@@ -226,6 +309,21 @@ namespace Apache.Ignite.Core.Compute
Task<TRes> AffinityCallAsync<TRes>(string cacheName, object affinityKey, IComputeFunc<TRes> clo);
/// <summary>
+ /// Executes given job on the node where data for provided affinity key is located
+ /// (a.k.a. affinity co-location).
+ /// </summary>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+ /// <param name="affinityKey">Affinity key.</param>
+ /// <param name="clo">Job to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Job result for this execution.
+ /// </returns>
+ Task<TRes> AffinityCallAsync<TRes>(string cacheName, object affinityKey, IComputeFunc<TRes> clo,
+ CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes collection of jobs on nodes within this grid projection.
/// </summary>
/// <param name="clos">Collection of jobs to execute.</param>
@@ -249,6 +347,20 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Executes collection of jobs on nodes within this grid projection.
/// </summary>
+ /// <typeparam name="TFuncRes">Type of function result.</typeparam>
+ /// <typeparam name="TRes">Type of result after reduce.</typeparam>
+ /// <param name="clos">Collection of jobs to execute.</param>
+ /// <param name="reducer">Reducer to reduce all job results into one individual return value.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Reduced job result for this execution.
+ /// </returns>
+ Task<TRes> CallAsync<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos,
+ IComputeReducer<TFuncRes, TRes> reducer, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Executes collection of jobs on nodes within this grid projection.
+ /// </summary>
/// <param name="clos">Collection of jobs to execute.</param>
/// <returns>Collection of job results for this execution.</returns>
/// <typeparam name="TRes">Type of job result.</typeparam>
@@ -263,6 +375,18 @@ namespace Apache.Ignite.Core.Compute
Task<ICollection<TRes>> CallAsync<TRes>(IEnumerable<IComputeFunc<TRes>> clos);
/// <summary>
+ /// Executes collection of jobs on nodes within this grid projection.
+ /// </summary>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="clos">Collection of jobs to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Collection of job results for this execution.
+ /// </returns>
+ Task<ICollection<TRes>> CallAsync<TRes>(IEnumerable<IComputeFunc<TRes>> clos,
+ CancellationToken cancellationToken);
+
+ /// <summary>
/// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
/// </summary>
/// <param name="clo">Job to broadcast to all projection nodes.</param>
@@ -277,6 +401,17 @@ namespace Apache.Ignite.Core.Compute
Task<ICollection<TRes>> BroadcastAsync<TRes>(IComputeFunc<TRes> clo);
/// <summary>
+ /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
+ /// </summary>
+ /// <typeparam name="TRes">The type of the resource.</typeparam>
+ /// <param name="clo">Job to broadcast to all projection nodes.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Collection of results for this execution.
+ /// </returns>
+ Task<ICollection<TRes>> BroadcastAsync<TRes>(IComputeFunc<TRes> clo, CancellationToken cancellationToken);
+
+ /// <summary>
/// Broadcasts given closure job with passed in argument to all nodes in grid projection.
/// Every participating node will return a job result.
/// </summary>
@@ -299,6 +434,21 @@ namespace Apache.Ignite.Core.Compute
Task<ICollection<TRes>> BroadcastAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg);
/// <summary>
+ /// Broadcasts given closure job with passed in argument to all nodes in grid projection.
+ /// Every participating node will return a job result.
+ /// </summary>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="clo">Job to broadcast to all projection nodes.</param>
+ /// <param name="arg">Job closure argument.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Collection of results for this execution.
+ /// </returns>
+ Task<ICollection<TRes>> BroadcastAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg,
+ CancellationToken cancellationToken);
+
+ /// <summary>
/// Broadcasts given job to all nodes in grid projection.
/// </summary>
/// <param name="action">Job to broadcast to all projection nodes.</param>
@@ -311,6 +461,14 @@ namespace Apache.Ignite.Core.Compute
Task BroadcastAsync(IComputeAction action);
/// <summary>
+ /// Broadcasts given job to all nodes in grid projection.
+ /// </summary>
+ /// <param name="action">Job to broadcast to all projection nodes.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>Task.</returns>
+ Task BroadcastAsync(IComputeAction action, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes provided job on a node in this grid projection.
/// </summary>
/// <param name="action">Job to execute.</param>
@@ -323,6 +481,13 @@ namespace Apache.Ignite.Core.Compute
Task RunAsync(IComputeAction action);
/// <summary>
+ /// Executes provided job on a node in this grid projection.
+ /// </summary>
+ /// <param name="action">Job to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ Task RunAsync(IComputeAction action, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes given job on the node where data for provided affinity key is located
/// (a.k.a. affinity co-location).
/// </summary>
@@ -341,6 +506,18 @@ namespace Apache.Ignite.Core.Compute
Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action);
/// <summary>
+ /// Executes given job on the node where data for provided affinity key is located
+ /// (a.k.a. affinity co-location).
+ /// </summary>
+ /// <param name="cacheName">Name of the cache to use for affinity co-location.</param>
+ /// <param name="affinityKey">Affinity key.</param>
+ /// <param name="action">Job to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>Task.</returns>
+ Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action,
+ CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes collection of jobs on Ignite nodes within this grid projection.
/// </summary>
/// <param name="actions">Jobs to execute.</param>
@@ -353,6 +530,14 @@ namespace Apache.Ignite.Core.Compute
Task RunAsync(IEnumerable<IComputeAction> actions);
/// <summary>
+ /// Executes collection of jobs on Ignite nodes within this grid projection.
+ /// </summary>
+ /// <param name="actions">Jobs to execute.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>Task.</returns>
+ Task RunAsync(IEnumerable<IComputeAction> actions, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes provided closure job on a node in this grid projection.
/// </summary>
/// <param name="clo">Job to run.</param>
@@ -373,6 +558,19 @@ namespace Apache.Ignite.Core.Compute
Task<TRes> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg);
/// <summary>
+ /// Executes provided closure job on a node in this grid projection.
+ /// </summary>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="arg">Job argument.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Job result for this execution.
+ /// </returns>
+ Task<TRes> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg, CancellationToken cancellationToken);
+
+ /// <summary>
/// Executes provided closure job on nodes within this grid projection. A new job is executed for
/// every argument in the passed in collection. The number of actual job executions will be
/// equal to size of the job arguments collection.
@@ -399,6 +597,22 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Executes provided closure job on nodes within this grid projection. A new job is executed for
/// every argument in the passed in collection. The number of actual job executions will be
+ /// equal to size of the job arguments collection.
+ /// </summary>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Сollection of job results.
+ /// </returns>
+ Task<ICollection<TRes>> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, IEnumerable<TArg> args,
+ CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+ /// every argument in the passed in collection. The number of actual job executions will be
/// equal to size of the job arguments collection. The returned job results will be reduced
/// into an individual result by provided reducer.
/// </summary>
@@ -427,5 +641,24 @@ namespace Apache.Ignite.Core.Compute
/// <typeparam name="TRes">Type of result after reduce.</typeparam>
Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args,
IComputeReducer<TFuncRes, TRes> rdc);
+
+ /// <summary>
+ /// Executes provided closure job on nodes within this grid projection. A new job is executed for
+ /// every argument in the passed in collection. The number of actual job executions will be
+ /// equal to size of the job arguments collection. The returned job results will be reduced
+ /// into an individual result by provided reducer.
+ /// </summary>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TFuncRes">Type of function result.</typeparam>
+ /// <typeparam name="TRes">Type of result after reduce.</typeparam>
+ /// <param name="clo">Job to run.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Reduced job result for this execution.
+ /// </returns>
+ Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args,
+ IComputeReducer<TFuncRes, TRes> rdc, CancellationToken cancellationToken);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs
new file mode 100644
index 0000000..0a84d81
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Provides cancelled tasks of given type.
+ /// </summary>
+ internal static class CancelledTask<T>
+ {
+ /** Task source. */
+ private static readonly TaskCompletionSource<T> TaskCompletionSource;
+
+ /// <summary>
+ /// Initializes the <see cref="CancelledTask{T}"/> class.
+ /// </summary>
+ static CancelledTask()
+ {
+ TaskCompletionSource = new TaskCompletionSource<T>();
+ TaskCompletionSource.SetCanceled();
+ }
+
+ /// <summary>
+ /// Gets the cancelled task.
+ /// </summary>
+ public static Task<T> Instance
+ {
+ get { return TaskCompletionSource.Task; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
index 9460be6..0325b71 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
@@ -18,9 +18,13 @@
namespace Apache.Ignite.Core.Impl.Common
{
using System;
+ using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Grid future implementation.
@@ -35,6 +39,9 @@ namespace Apache.Ignite.Core.Impl.Common
/** Task completion source. */
private readonly TaskCompletionSource<T> _taskCompletionSource = new TaskCompletionSource<T>();
+ /** */
+ private volatile IUnmanagedTarget _unmanagedTarget;
+
/// <summary>
/// Constructor.
/// </summary>
@@ -44,7 +51,9 @@ namespace Apache.Ignite.Core.Impl.Common
_converter = converter;
}
- /** <inheritdoc/> */
+ /// <summary>
+ /// Gets the result.
+ /// </summary>
public T Get()
{
try
@@ -57,12 +66,28 @@ namespace Apache.Ignite.Core.Impl.Common
}
}
- /** <inheritdoc/> */
+ /// <summary>
+ /// Gets the task.
+ /// </summary>
public Task<T> Task
{
get { return _taskCompletionSource.Task; }
}
+ /// <summary>
+ /// Gets the task with cancellation.
+ /// </summary>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ public Task<T> GetTask(CancellationToken cancellationToken)
+ {
+ Debug.Assert(_unmanagedTarget != null);
+
+ // OnTokenCancel will fire even if cancellationToken is already cancelled.
+ cancellationToken.Register(OnTokenCancel);
+
+ return Task;
+ }
+
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
public void OnResult(IBinaryStream stream)
@@ -80,7 +105,10 @@ namespace Apache.Ignite.Core.Impl.Common
/** <inheritdoc /> */
public void OnError(Exception err)
{
- _taskCompletionSource.TrySetException(err);
+ if (err is IgniteFutureCancelledException)
+ _taskCompletionSource.TrySetCanceled();
+ else
+ _taskCompletionSource.TrySetException(err);
}
/** <inheritdoc /> */
@@ -124,5 +152,45 @@ namespace Apache.Ignite.Core.Impl.Common
else
OnResult(res);
}
+
+ /// <summary>
+ /// Sets unmanaged future target for cancellation.
+ /// </summary>
+ internal void SetTarget(IUnmanagedTarget target)
+ {
+ Debug.Assert(target != null);
+
+ _unmanagedTarget = target;
+ }
+
+ /// <summary>
+ /// Cancels this instance.
+ /// </summary>
+ internal bool Cancel()
+ {
+ if (_unmanagedTarget == null)
+ return false;
+
+ return UnmanagedUtils.ListenableCancel(_unmanagedTarget);
+ }
+
+ /// <summary>
+ /// Determines whether this instance is cancelled.
+ /// </summary>
+ internal bool IsCancelled()
+ {
+ if (_unmanagedTarget == null)
+ return false;
+
+ return UnmanagedUtils.ListenableIsCancelled(_unmanagedTarget);
+ }
+
+ /// <summary>
+ /// Called when token cancellation occurs.
+ /// </summary>
+ private void OnTokenCancel()
+ {
+ Cancel();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
index 0f8fd33..300e944 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
@@ -20,9 +20,11 @@ namespace Apache.Ignite.Core.Impl.Compute
using System;
using System.Collections.Generic;
using System.Diagnostics;
+ using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Common;
/// <summary>
/// Synchronous Compute facade.
@@ -86,6 +88,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> ExecuteJavaTaskAsync<TRes>(string taskName, object taskArg,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.ExecuteJavaTaskAsync<TRes>(taskName, taskArg).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TReduceRes Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg)
{
return _compute.Execute(task, taskArg).Get();
@@ -98,6 +108,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> ExecuteAsync<TArg, TJobRes, TRes>(IComputeTask<TArg, TJobRes, TRes> task, TArg taskArg,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Execute(task, taskArg).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TJobRes Execute<TArg, TJobRes>(IComputeTask<TArg, TJobRes> task)
{
return _compute.Execute(task, null).Get();
@@ -110,6 +128,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> ExecuteAsync<TJobRes, TRes>(IComputeTask<TJobRes, TRes> task,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Execute(task, null).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TReduceRes Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg)
{
return _compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg).Get();
@@ -122,6 +148,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TReduceRes> ExecuteAsync<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TReduceRes>(cancellationToken) ??
+ _compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TReduceRes Execute<TArg, TReduceRes>(Type taskType)
{
return _compute.Execute<object, TArg, TReduceRes>(taskType, null).Get();
@@ -134,6 +168,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TReduceRes> ExecuteAsync<TArg, TReduceRes>(Type taskType, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TReduceRes>(cancellationToken) ??
+ _compute.Execute<object, TArg, TReduceRes>(taskType, null).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TJobRes Call<TJobRes>(IComputeFunc<TJobRes> clo)
{
return _compute.Execute(clo).Get();
@@ -146,6 +187,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> CallAsync<TRes>(IComputeFunc<TRes> clo, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Execute(clo).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TJobRes AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo)
{
return _compute.AffinityCall(cacheName, affinityKey, clo).Get();
@@ -158,18 +206,35 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> AffinityCallAsync<TRes>(string cacheName, object affinityKey, IComputeFunc<TRes> clo,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.AffinityCall(cacheName, affinityKey, clo).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TJobRes Call<TJobRes>(Func<TJobRes> func)
{
return _compute.Execute(func).Get();
}
/** <inheritDoc /> */
- public Task<TRes> CallAsync<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos, IComputeReducer<TFuncRes, TRes> reducer)
+ public Task<TRes> CallAsync<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos,
+ IComputeReducer<TFuncRes, TRes> reducer)
{
return _compute.Execute(clos, reducer).Task;
}
/** <inheritDoc /> */
+ public Task<TRes> CallAsync<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos,
+ IComputeReducer<TFuncRes, TRes> reducer, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Execute(clos, reducer).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public ICollection<TJobRes> Call<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos)
{
return _compute.Execute(clos).Get();
@@ -182,6 +247,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<ICollection<TRes>> CallAsync<TRes>(IEnumerable<IComputeFunc<TRes>> clos,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<ICollection<TRes>>(cancellationToken) ??
+ _compute.Execute(clos).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TReduceRes Call<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos,
IComputeReducer<TJobRes, TReduceRes> reducer)
{
@@ -201,6 +274,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<ICollection<TRes>> BroadcastAsync<TRes>(IComputeFunc<TRes> clo, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<ICollection<TRes>>(cancellationToken) ??
+ _compute.Broadcast(clo).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public ICollection<TJobRes> Broadcast<T, TJobRes>(IComputeFunc<T, TJobRes> clo, T arg)
{
return _compute.Broadcast(clo, arg).Get();
@@ -213,6 +293,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<ICollection<TRes>> BroadcastAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<ICollection<TRes>>(cancellationToken) ??
+ _compute.Broadcast(clo, arg).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public void Broadcast(IComputeAction action)
{
_compute.Broadcast(action).Get();
@@ -225,6 +313,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task BroadcastAsync(IComputeAction action, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<object>(cancellationToken) ??
+ _compute.Broadcast(action).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public void Run(IComputeAction action)
{
_compute.Run(action).Get();
@@ -237,6 +332,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task RunAsync(IComputeAction action, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<object>(cancellationToken) ??
+ _compute.Run(action).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
{
_compute.AffinityRun(cacheName, affinityKey, action).Get();
@@ -249,6 +351,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<object>(cancellationToken) ??
+ _compute.AffinityRun(cacheName, affinityKey, action).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public void Run(IEnumerable<IComputeAction> actions)
{
_compute.Run(actions).Get();
@@ -261,6 +371,13 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task RunAsync(IEnumerable<IComputeAction> actions, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<object>(cancellationToken) ??
+ _compute.Run(actions).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TJobRes Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
return _compute.Apply(clo, arg).Get();
@@ -273,6 +390,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<TRes> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Apply(clo, arg).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public ICollection<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args)
{
return _compute.Apply(clo, args).Get();
@@ -285,6 +410,14 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
+ public Task<ICollection<TRes>> ApplyAsync<TArg, TRes>(IComputeFunc<TArg, TRes> clo, IEnumerable<TArg> args,
+ CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<ICollection<TRes>>(cancellationToken) ??
+ _compute.Apply(clo, args).GetTask(cancellationToken);
+ }
+
+ /** <inheritDoc /> */
public TReduceRes Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo,
IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc)
{
@@ -292,9 +425,29 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args, IComputeReducer<TFuncRes, TRes> rdc)
+ public Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args,
+ IComputeReducer<TFuncRes, TRes> rdc)
{
return _compute.Apply(clo, args, rdc).Task;
}
+
+ /** <inheritDoc /> */
+ public Task<TRes> ApplyAsync<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args,
+ IComputeReducer<TFuncRes, TRes> rdc, CancellationToken cancellationToken)
+ {
+ return GetTaskIfAlreadyCancelled<TRes>(cancellationToken) ??
+ _compute.Apply(clo, args, rdc).GetTask(cancellationToken);
+ }
+
+ /// <summary>
+ /// Gets the cancelled task if specified token is cancelled.
+ /// </summary>
+ private static Task<T> GetTaskIfAlreadyCancelled<T>(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ return CancelledTask<T>.Instance;
+
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index b44b2ee..86dee30 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -165,7 +165,8 @@ namespace Apache.Ignite.Core.Impl.Compute
WriteTask(writer, taskName, taskArg, nodes);
}, input =>
{
- fut = GetFuture<TReduceRes>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepBinary.Value);
+ fut = GetFuture<TReduceRes>((futId, futTyp) =>
+ UU.TargetListenFutureAndGet(Target, futId, futTyp), _keepBinary.Value);
});
return fut;
@@ -192,9 +193,13 @@ namespace Apache.Ignite.Core.Impl.Compute
long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
- UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion);
+ var futTarget = UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion);
- return holder.Future;
+ var future = holder.Future;
+
+ future.SetTarget(futTarget);
+
+ return future;
}
/// <summary>
@@ -522,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Compute
try
{
- DoOutOp(opId, writer =>
+ var futTarget = DoOutOpObject(opId, writer =>
{
writer.WriteLong(taskHandle);
@@ -546,6 +551,8 @@ namespace Apache.Ignite.Core.Impl.Compute
if (writeAction != null)
writeAction(writer);
});
+
+ holder.Future.SetTarget(futTarget);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
index e0735e1..665d37e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
@@ -95,6 +95,10 @@ namespace Apache.Ignite.Core.Impl
// Security exceptions.
EXS["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m);
EXS["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m);
+
+ // Future exceptions
+ EXS["org.apache.ignite.lang.IgniteFutureCancelledException"] = m => new IgniteFutureCancelledException(m);
+ EXS["org.apache.ignite.internal.IgniteFutureCancelledCheckedException"] = m => new IgniteFutureCancelledException(m);
}
/// <summary>