You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/03/15 11:02:21 UTC
ignite git commit: IGNITE-4729 Async operation support in platform
plugins
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 be93baa75 -> 637c18de1
IGNITE-4729 Async operation support in platform plugins
This closes #1561
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/637c18de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/637c18de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/637c18de
Branch: refs/heads/ignite-2.0
Commit: 637c18de190515293e01434862004a410cfadd53
Parents: be93baa
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Wed Mar 15 14:02:12 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Wed Mar 15 14:02:12 2017 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 8 ++
.../platform/PlatformAsyncResult.java | 41 +++++++++
.../processors/platform/PlatformTarget.java | 10 +++
.../platform/PlatformTargetProxy.java | 9 ++
.../platform/PlatformTargetProxyImpl.java | 39 +++++++++
.../plugin/PlatformTestPluginTarget.java | 89 +++++++++++++++++++-
.../cpp/jni/include/ignite/jni/exports.h | 1 +
.../platforms/cpp/jni/include/ignite/jni/java.h | 2 +
modules/platforms/cpp/jni/project/vs/module.def | 1 +
modules/platforms/cpp/jni/src/exports.cpp | 4 +
modules/platforms/cpp/jni/src/java.cpp | 10 +++
.../Plugin/PluginTest.cs | 17 ++++
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 29 +++++++
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 +
.../Impl/Unmanaged/UnmanagedUtils.cs | 5 ++
.../Interop/IPlatformTarget.cs | 13 +++
16 files changed, 277 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 506470b..396e784 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -125,6 +125,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget, Platform
return throwUnsupported(type);
}
+ /** {@inheritDoc} */
+ @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
+ throwUnsupported(type);
+
+ return null;
+ }
+
/**
* Throw an exception rendering unsupported operation type.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
new file mode 100644
index 0000000..879f85d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * Represents asynchronous operation result.
+ */
+public interface PlatformAsyncResult {
+ /**
+ * Async operation future.
+ *
+ * @return Future.
+ */
+ IgniteFuture future();
+
+ /**
+ * Async operation result writer method.
+ *
+ * @param writer Writer.
+ * @param result Async operation result.
+ */
+ void write(BinaryRawWriterEx writer, Object result);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 5d234dd..9792df8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -108,6 +108,16 @@ public interface PlatformTarget {
PlatformTarget processOutObject(int type) throws IgniteCheckedException;
/**
+ * Process asynchronous operation.
+ *
+ * @param type Type.
+ * @param reader Binary reader.
+ * @return Async result (should not be null).
+ * @throws IgniteCheckedException In case of exception.
+ */
+ PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
+
+ /**
* Convert caught exception.
*
* @param e Exception to convert.
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
index a4f2a56..c2a0797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -97,6 +97,15 @@ public interface PlatformTargetProxy {
Object outObject(int type) throws Exception;
/**
+ * Asynchronous operation accepting memory stream.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @throws Exception If case of failure.
+ */
+ void inStreamAsync(int type, long memPtr) throws Exception;
+
+ /**
* Start listening for the future.
*
* @param futId Future ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
index 25a4de8..7e0036d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -18,12 +18,14 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.lang.IgniteFuture;
/**
* Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
@@ -104,6 +106,43 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
}
/** {@inheritDoc} */
+ @Override public void inStreamAsync(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+ long futId = reader.readLong();
+ int futTyp = reader.readInt();
+
+ final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
+
+ if (res == null) {
+ throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
+ }
+
+ IgniteFuture fut = res.future();
+
+ if (fut == null) {
+ throw new IgniteException("PlatformAsyncResult.future() should not return null.");
+ }
+
+ PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
+ /** {@inheritDoc} */
+ @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
+ res.write(writer, obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return err == null;
+ }
+ }, target);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
BinaryRawReaderEx reader = platformCtx.reader(inMem);
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
index e80a23f..7e69425 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
@@ -21,11 +21,14 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformAsyncResult;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.PluginConfiguration;
import org.jetbrains.annotations.Nullable;
@@ -33,17 +36,20 @@ import org.jetbrains.annotations.Nullable;
* Test target.
*/
@SuppressWarnings("ConstantConditions")
-class PlatformTestPluginTarget extends PlatformAbstractTarget {
+class PlatformTestPluginTarget implements PlatformTarget {
/** */
private final String name;
+ /** */
+ private final PlatformContext platformCtx;
+
/**
* Constructor.
*
* @param platformCtx Context.
*/
PlatformTestPluginTarget(PlatformContext platformCtx, String name) {
- super(platformCtx);
+ this.platformCtx = platformCtx;
if (name == null) {
// Initialize from configuration.
@@ -65,12 +71,17 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget {
return val + 1;
}
- /** {@inheritDoc} */
@Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
return reader.readString().length();
}
/** {@inheritDoc} */
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem)
+ throws IgniteCheckedException {
+ return processInStreamOutLong(type, reader);
+ }
+
+ /** {@inheritDoc} */
@Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
String s = reader.readString();
@@ -129,6 +140,76 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget {
return new PlatformTestPluginTarget(platformCtx, name);
}
+ /** {@inheritDoc} */
+ @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case 1: {
+ // Async upper case.
+ final String val = reader.readString();
+ final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ Thread.sleep(500L);
+ fa.onDone(val.toUpperCase());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+
+ return new PlatformAsyncResult() {
+ @Override public IgniteFuture future() {
+ //noinspection unchecked
+ return new IgniteFutureImpl(fa);
+ }
+
+ @Override public void write(BinaryRawWriterEx writer, Object result) {
+ writer.writeString((String) result);
+ }
+ };
+ }
+ case 2: {
+ // Exception.
+ throw new PlatformTestPluginException("123");
+ }
+ case 3: {
+ // Async exception.
+ final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ Thread.sleep(500L);
+ fa.onDone(new PlatformTestPluginException("x"));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+
+ return new PlatformAsyncResult() {
+ @Override public IgniteFuture future() {
+ //noinspection unchecked
+ return new IgniteFutureImpl(fa);
+ }
+
+ @Override public void write(BinaryRawWriterEx writer, Object result) {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Exception convertException(Exception e) {
+ return e;
+ }
+
/**
* Gets the plugin config.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index a93f580..06be75d 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -66,6 +66,7 @@ extern "C" {
void* IGNITE_CALL IgniteTargetInObjectStreamOutObjectStream(gcj::JniContext* ctx, void* obj, int opType, void* arg, long long inMemPtr, long long outMemPtr);
void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
+ void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ);
void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index a07b844..7c5d684 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -208,6 +208,7 @@ namespace ignite
jmethodID m_PlatformTarget_inStreamOutObject;
jmethodID m_PlatformTarget_outStream;
jmethodID m_PlatformTarget_outObject;
+ jmethodID m_PlatformTarget_inStreamAsync;
jmethodID m_PlatformTarget_inStreamOutStream;
jmethodID m_PlatformTarget_inObjectStreamOutObjectStream;
jmethodID m_PlatformTarget_listenFuture;
@@ -387,6 +388,7 @@ namespace ignite
jobject TargetInObjectStreamOutObjectStream(jobject obj, int opType, void* arg, long long inMemPtr, long long outMemPtr, JniErrorInfo* errInfo = NULL);
void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL);
+ void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
void TargetListenFuture(jobject obj, long long futId, int typ);
void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index 45a5bff..8159f8d 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -23,6 +23,7 @@ IgniteTargetInObjectStreamOutObjectStream @21
IgniteTargetListenFuture @22
IgniteTargetListenFutureForOperation @23
IgniteTargetInLongOutLong @24
+IgniteTargetInStreamAsync @25
IgniteProcessorCompute @64
IgniteProcessorMessage @65
IgniteProcessorEvents @66
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index 17fed71..6c590e4 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -182,6 +182,10 @@ extern "C" {
return ctx->TargetOutObject(static_cast<jobject>(obj), opType);
}
+ void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr) {
+ ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr);
+ }
+
void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ) {
ctx->TargetListenFuture(static_cast<jobject>(obj), futId, typ);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 1988a86..004a99c 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -258,6 +258,7 @@ namespace ignite
JniMethod M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM = JniMethod("inObjectStreamOutObjectStream", "(ILjava/lang/Object;JJ)Ljava/lang/Object;", false);
JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false);
JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false);
+ JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V", false);
JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false);
JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false);
@@ -590,6 +591,7 @@ namespace ignite
m_PlatformTarget_outObject = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_OUT_OBJECT);
m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM);
m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM);
+ m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC);
m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE);
m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION);
@@ -1386,6 +1388,14 @@ namespace ignite
return LocalToGlobal(env, res);
}
+ void JniContext::TargetInStreamAsync(jobject obj, int opType, long long memPtr, JniErrorInfo* err) {
+ JNIEnv* env = Attach();
+
+ env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamAsync, opType, memPtr);
+
+ ExceptionCheck(env, err);
+ }
+
void JniContext::TargetListenFuture(jobject obj, long long futId, int typ) {
JNIEnv* env = Attach();
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
index b6c00b5..8256bba 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Plugin
using System;
using System.Collections.Generic;
using System.IO;
+ using System.Linq;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Interop;
@@ -117,6 +118,22 @@ namespace Apache.Ignite.Core.Tests.Plugin
var resCopy = res.Item2.OutObject(1);
Assert.AreEqual("name1_abc", resCopy.OutStream(1, r => r.ReadString()));
+ // Async operation.
+ var task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString());
+ Assert.IsFalse(task.IsCompleted);
+ var asyncRes = task.Result;
+ Assert.IsTrue(task.IsCompleted);
+ Assert.AreEqual("FOO", asyncRes);
+
+ // Async operation with exception in entry point.
+ Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2, null, null));
+
+ // Async operation with exception in future.
+ var errTask = target.DoOutOpAsync<object>(3, null, null);
+ Assert.IsFalse(errTask.IsCompleted);
+ var aex = Assert.Throws<AggregateException>(() => errTask.Wait());
+ Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single());
+
// Throws custom mapped exception.
var ex = Assert.Throws<TestIgnitePluginException>(() => target.InLongOutLong(-1, 0));
Assert.AreEqual("Baz", ex.Message);
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f115042..621bfa5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -962,6 +962,35 @@ namespace Apache.Ignite.Core.Impl
return GetPlatformTarget(DoOutOpObject(type));
}
+ /** <inheritdoc /> */
+ public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null,
+ Func<IBinaryRawReader, T> readAction = null)
+ {
+ var convertFunc = readAction != null
+ ? r => readAction(r)
+ : (Func<BinaryReader, T>) null;
+
+ return GetFuture((futId, futType) =>
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ stream.WriteLong(futId);
+ stream.WriteInt(futType);
+
+ if (writeAction != null)
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+ }
+
+ UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput());
+ }
+ }, false, convertFunc).Task;
+ }
+
/// <summary>
/// Gets the platform target.
/// </summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index a6a3a31..289589f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -149,6 +149,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")]
public static extern void* TargetOutObject(void* ctx, void* target, int opType);
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetInStreamAsync")]
+ public static extern void TargetInStreamAsync(void* ctx, void* target, int opType, long memPtr);
+
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")]
public static extern void* Acquire(void* ctx, void* target);
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 90e5230..986972f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -464,6 +464,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
return target.ChangeTarget(res);
}
+ internal static void TargetInStreamAsync(IUnmanagedTarget target, int opType, long memPtr)
+ {
+ JNI.TargetInStreamAsync(target.Context, target.Target, opType, memPtr);
+ }
+
#endregion
#region NATIVE METHODS: MISCELANNEOUS
http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
index 8b8963f..e8f8bfb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
@@ -18,7 +18,9 @@
namespace Apache.Ignite.Core.Interop
{
using System;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Impl.Binary;
/// <summary>
/// Interface to interoperate with
@@ -87,5 +89,16 @@ namespace Apache.Ignite.Core.Interop
/// <param name="type">Operation type code.</param>
/// <returns>Result.</returns>
IPlatformTarget OutObject(int type);
+
+ /// <summary>
+ /// Performs asynchronous operation.
+ /// </summary>
+ /// <typeparam name="T">Result type</typeparam>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action (can be null).</param>
+ /// <param name="readAction">Read function (can be null).</param>
+ /// <returns>Task.</returns>
+ Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction,
+ Func<IBinaryRawReader, T> readAction);
}
}