You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/11 06:58:04 UTC

[34/50] [abbrv] ignite git commit: IGNITE-2228: .NET: Compute futures could be cancelled.

http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index 4a4f93b..0472ce4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
+    using System.Threading;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
@@ -289,6 +290,26 @@ namespace Apache.Ignite.Core.Impl
         }
 
         /// <summary>
+        /// Perform out operation.
+        /// </summary>
+        /// <param name="type">Operation type.</param>
+        /// <param name="action">Action to be performed on the stream.</param>
+        /// <returns></returns>
+        protected IUnmanagedTarget DoOutOpObject(int type, Action<BinaryWriter> action)
+        {
+            using (var stream = IgniteManager.Memory.Allocate().GetStream())
+            {
+                var writer = _marsh.StartMarshal(stream);
+
+                action(writer);
+
+                FinishMarshal(writer);
+
+                return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput());
+            }
+        }
+
+        /// <summary>
         /// Perform simple output operation accepting single argument.
         /// </summary>
         /// <param name="type">Operation type.</param>
@@ -633,6 +654,37 @@ namespace Apache.Ignite.Core.Impl
         /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
         /// <param name="convertFunc">The function to read future result from stream.</param>
         /// <returns>Created future.</returns>
+        protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
+            Func<BinaryReader, T> convertFunc = null)
+        {
+            var futType = FutureType.Object;
+
+            var type = typeof(T);
+
+            if (type.IsPrimitive)
+                IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+            var fut = convertFunc == null && futType != FutureType.Object
+                ? new Future<T>()
+                : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+            var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+            var futTarget = listenAction(futHnd, (int) futType);
+
+            fut.SetTarget(futTarget);
+
+            return fut;
+        }
+
+        /// <summary>
+        /// Creates a future and starts listening.
+        /// </summary>
+        /// <typeparam name="T">Future result type</typeparam>
+        /// <param name="listenAction">The listen action.</param>
+        /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+        /// <param name="convertFunc">The function to read future result from stream.</param>
+        /// <returns>Created future.</returns>
         protected Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
             Func<BinaryReader, T> convertFunc = null)
         {

http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index 860e703..5e54a4c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -113,6 +113,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")]
         public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId);
 
+        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")]
+        public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ);
+
+        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")]
+        public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId);
+
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAffinityPartitions")]
         public static extern int AffinityParts(void* ctx, void* target);
 
@@ -178,7 +184,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         public static extern void ComputeWithTimeout(void* ctx, void* target, long timeout);
 
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteComputeExecuteNative")]
-        public static extern void ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer);
+        public static extern void* ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer);
 
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteContinuousQueryClose")]
         public static extern void ContinuousQryClose(void* ctx, void* target);
@@ -354,5 +360,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicLongClose")]
         public static extern void AtomicLongClose(void* ctx, void* target);
+
+        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")]
+        [return: MarshalAs(UnmanagedType.U1)]
+        public static extern bool ListenableCancel(void* ctx, void* target);
+
+        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")]
+        [return: MarshalAs(UnmanagedType.U1)]
+        public static extern bool ListenableIsCancelled(void* ctx, void* target);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 56a184d..4c8f1dc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -305,6 +305,21 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId);
         }
 
+        internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ)
+        {
+            var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ);
+
+            return target.ChangeTarget(res);
+        }
+
+        internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId,
+            int typ, int opId)
+        {
+            var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId);
+
+            return target.ChangeTarget(res);
+        }
+
         #endregion
 
         #region NATIVE METHODS: AFFINITY
@@ -440,9 +455,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             JNI.ComputeWithTimeout(target.Context, target.Target, timeout);
         }
 
-        internal static void ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer)
+        internal static IUnmanagedTarget ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer)
         {
-            JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer);
+            void* res = JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer);
+
+            return target.ChangeTarget(res);
         }
 
         #endregion
@@ -816,6 +833,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             JNI.AtomicLongClose(target.Context, target.Target);
         }
 
+        internal static bool ListenableCancel(IUnmanagedTarget target)
+        {
+            return JNI.ListenableCancel(target.Context, target.Target);
+        }
+
+        internal static bool ListenableIsCancelled(IUnmanagedTarget target)
+        {
+            return JNI.ListenableIsCancelled(target.Context, target.Target);
+        }
+
         #endregion
     }
 }