You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/03 21:23:44 UTC
[06/13] ignite git commit: IGNITE-2228: .NET: Compute futures could
be cancelled.
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index 4a4f93b..0472ce4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
+ using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
@@ -289,6 +290,26 @@ namespace Apache.Ignite.Core.Impl
}
/// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action to be performed on the stream.</param>
+ /// <returns></returns>
+ protected IUnmanagedTarget DoOutOpObject(int type, Action<BinaryWriter> action)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ action(writer);
+
+ FinishMarshal(writer);
+
+ return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput());
+ }
+ }
+
+ /// <summary>
/// Perform simple output operation accepting single argument.
/// </summary>
/// <param name="type">Operation type.</param>
@@ -633,6 +654,37 @@ namespace Apache.Ignite.Core.Impl
/// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
/// <param name="convertFunc">The function to read future result from stream.</param>
/// <returns>Created future.</returns>
+ protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ var futType = FutureType.Object;
+
+ var type = typeof(T);
+
+ if (type.IsPrimitive)
+ IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+ var fut = convertFunc == null && futType != FutureType.Object
+ ? new Future<T>()
+ : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+ var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+ var futTarget = listenAction(futHnd, (int) futType);
+
+ fut.SetTarget(futTarget);
+
+ return fut;
+ }
+
+ /// <summary>
+ /// Creates a future and starts listening.
+ /// </summary>
+ /// <typeparam name="T">Future result type</typeparam>
+ /// <param name="listenAction">The listen action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Created future.</returns>
protected Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
Func<BinaryReader, T> convertFunc = null)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index 860e703..5e54a4c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -113,6 +113,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")]
public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId);
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")]
+ public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ);
+
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")]
+ public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId);
+
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAffinityPartitions")]
public static extern int AffinityParts(void* ctx, void* target);
@@ -178,7 +184,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
public static extern void ComputeWithTimeout(void* ctx, void* target, long timeout);
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteComputeExecuteNative")]
- public static extern void ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer);
+ public static extern void* ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer);
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteContinuousQueryClose")]
public static extern void ContinuousQryClose(void* ctx, void* target);
@@ -354,5 +360,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicLongClose")]
public static extern void AtomicLongClose(void* ctx, void* target);
+
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")]
+ [return: MarshalAs(UnmanagedType.U1)]
+ public static extern bool ListenableCancel(void* ctx, void* target);
+
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")]
+ [return: MarshalAs(UnmanagedType.U1)]
+ public static extern bool ListenableIsCancelled(void* ctx, void* target);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 56a184d..4c8f1dc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -305,6 +305,21 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId);
}
+ internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ)
+ {
+ var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ);
+
+ return target.ChangeTarget(res);
+ }
+
+ internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId,
+ int typ, int opId)
+ {
+ var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId);
+
+ return target.ChangeTarget(res);
+ }
+
#endregion
#region NATIVE METHODS: AFFINITY
@@ -440,9 +455,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.ComputeWithTimeout(target.Context, target.Target, timeout);
}
- internal static void ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer)
+ internal static IUnmanagedTarget ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer)
{
- JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer);
+ void* res = JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer);
+
+ return target.ChangeTarget(res);
}
#endregion
@@ -816,6 +833,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.AtomicLongClose(target.Context, target.Target);
}
+ internal static bool ListenableCancel(IUnmanagedTarget target)
+ {
+ return JNI.ListenableCancel(target.Context, target.Target);
+ }
+
+ internal static bool ListenableIsCancelled(IUnmanagedTarget target)
+ {
+ return JNI.ListenableIsCancelled(target.Context, target.Target);
+ }
+
#endregion
}
}