You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/09 08:38:26 UTC

[12/50] [abbrv] ignite git commit: IGNITE-4074 Refactor async (*future) operations in PlatformTarget

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index 5d1add640..bc1b4bb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -53,10 +53,11 @@ namespace Apache.Ignite.Core.Impl.Events
             EnableLocal = 8,
             DisableLocal = 9,
             GetEnabledEvents = 10,
-            WithAsync = 11,
             IsEnabled = 12,
             LocalListen = 13,
-            StopLocalListen = 14
+            StopLocalListen = 14,
+            RemoteQueryAsync = 15,
+            WaitForLocalAsync = 16
         }
 
         /** Map from user func to local wrapper, needed for invoke/unsubscribe. */
@@ -66,9 +67,6 @@ namespace Apache.Ignite.Core.Impl.Events
         /** Cluster group. */
         private readonly IClusterGroup _clusterGroup;
         
-        /** Async instance. */
-        private readonly Lazy<Events> _asyncInstance;
-
         /// <summary>
         /// Initializes a new instance of the <see cref="Events" /> class.
         /// </summary>
@@ -81,17 +79,6 @@ namespace Apache.Ignite.Core.Impl.Events
             Debug.Assert(clusterGroup != null);
 
             _clusterGroup = clusterGroup;
-
-            _asyncInstance = new Lazy<Events>(() => new Events(this));
-        }
-
-        /// <summary>
-        /// Initializes a new async instance.
-        /// </summary>
-        /// <param name="events">The events.</param>
-        private Events(Events events) : base(UU.TargetOutObject(events.Target, (int) Op.WithAsync), events.Marshaller)
-        {
-            _clusterGroup = events.ClusterGroup;
         }
 
         /** <inheritDoc /> */
@@ -106,14 +93,6 @@ namespace Apache.Ignite.Core.Impl.Events
             get { return (Ignite) ClusterGroup.Ignite; }
         }
 
-        /// <summary>
-        /// Gets the asynchronous instance.
-        /// </summary>
-        private Events AsyncInstance
-        {
-            get { return _asyncInstance.Value; }
-        }
-
         /** <inheritDoc /> */
         public ICollection<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
             where T : IEvent
@@ -121,14 +100,7 @@ namespace Apache.Ignite.Core.Impl.Events
             IgniteArgumentCheck.NotNull(filter, "filter");
 
             return DoOutInOp((int) Op.RemoteQuery,
-                writer =>
-                {
-                    writer.Write(filter);
-
-                    writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds));
-
-                    WriteEventTypes(types, writer);
-                },
+                writer => WriteRemoteQuery(filter, timeout, types, writer),
                 reader => ReadEvents<T>(reader));
         }
 
@@ -136,11 +108,11 @@ namespace Apache.Ignite.Core.Impl.Events
         public Task<ICollection<T>> RemoteQueryAsync<T>(IEventFilter<T> filter, TimeSpan? timeout = null, 
             params int[] types) where T : IEvent
         {
-            AsyncInstance.RemoteQuery(filter, timeout, types);
+            IgniteArgumentCheck.NotNull(filter, "filter");
 
             // ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010)
-            return GetFuture<ICollection<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target, 
-                futId, futTyp, (int) Op.RemoteQuery), convertFunc: ReadEvents<T>).Task;
+            return DoOutOpAsync<ICollection<T>>((int) Op.RemoteQueryAsync,
+                w => WriteRemoteQuery(filter, timeout, types, w), convertFunc: ReadEvents<T>);
         }
 
         /** <inheritDoc /> */
@@ -234,46 +206,53 @@ namespace Apache.Ignite.Core.Impl.Events
         /** <inheritDoc /> */
         public T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
         {
-            long hnd = 0;
+            var hnd = GetFilterHandle(filter);
 
             try
             {
-                return WaitForLocal0(filter, ref hnd, types);
+                return DoOutInOp((int) Op.WaitForLocal,
+                    writer =>
+                    {
+                        writer.WriteObject(hnd);
+                        WriteEventTypes(types, writer);
+                    },
+                    reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
             }
             finally
             {
-                if (filter != null)
-                    Ignite.HandleRegistry.Release(hnd);
+                if (hnd != null)
+                    Ignite.HandleRegistry.Release(hnd.Value);
             }
         }
 
         /** <inheritDoc /> */
         public Task<T> WaitForLocalAsync<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
         {
-            long hnd = 0;
+            var hnd = GetFilterHandle(filter);
 
             try
             {
-                AsyncInstance.WaitForLocal0(filter, ref hnd, types);
-
-                // ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010)
-                var fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target, futId,
-                    futTyp, (int) Op.WaitForLocal), convertFunc: reader => (T) EventReader.Read<IEvent>(reader));
+                var task = DoOutOpAsync((int) Op.WaitForLocalAsync, writer =>
+                {
+                    writer.WriteObject(hnd);
+                    WriteEventTypes(types, writer);
+                }, convertFunc: EventReader.Read<T>);
 
-                if (filter != null)
+                if (hnd != null)
                 {
                     // Dispose handle as soon as future ends.
-                    fut.Task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd));
+                    task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd.Value));
                 }
 
-                return fut.Task;
+                return task;
             }
             catch (Exception)
             {
-                Ignite.HandleRegistry.Release(hnd);
+                if (hnd != null)
+                    Ignite.HandleRegistry.Release(hnd.Value);
+
                 throw;
             }
-
         }
 
         /** <inheritDoc /> */
@@ -392,38 +371,13 @@ namespace Apache.Ignite.Core.Impl.Events
         }
 
         /// <summary>
-        /// Waits for the specified events.
+        /// Gets the filter handle.
         /// </summary>
-        /// <typeparam name="T">Type of events.</typeparam>
-        /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param>
-        /// <param name="handle">The filter handle, if applicable.</param>
-        /// <param name="types">Types of the events to wait for. 
-        /// If not provided, all events will be passed to the filter.</param>
-        /// <returns>Ignite event.</returns>
-        private T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent
-        {
-            if (filter != null)
-                handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter
-                {
-                    InvokeFunc = stream => InvokeLocalFilter(stream, filter)
-                });
-
-            var hnd = handle;
-
-            return DoOutInOp((int)Op.WaitForLocal,
-                writer =>
-                {
-                    if (filter != null)
-                    {
-                        writer.WriteBoolean(true);
-                        writer.WriteLong(hnd);
-                    }
-                    else
-                        writer.WriteBoolean(false);
-
-                    WriteEventTypes(types, writer);
-                },
-                reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
+        private long? GetFilterHandle<T>(IEventFilter<T> filter) where T : IEvent
+        {
+            return filter != null 
+                ? Ignite.HandleRegistry.Allocate(new LocalEventFilter<T>(Marshaller, filter)) 
+                : (long?) null;
         }
 
         /// <summary>
@@ -559,20 +513,6 @@ namespace Apache.Ignite.Core.Impl.Events
         /// <param name="stream">The stream.</param>
         /// <param name="listener">The listener.</param>
         /// <returns>Filter invocation result.</returns>
-        private bool InvokeLocalFilter<T>(IBinaryStream stream, IEventFilter<T> listener) where T : IEvent
-        {
-            var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
-
-            return listener.Invoke(evt);
-        }
-
-        /// <summary>
-        /// Invokes local filter using data from specified stream.
-        /// </summary>
-        /// <typeparam name="T">Event object type.</typeparam>
-        /// <param name="stream">The stream.</param>
-        /// <param name="listener">The listener.</param>
-        /// <returns>Filter invocation result.</returns>
         private bool InvokeLocalListener<T>(IBinaryStream stream, IEventListener<T> listener) where T : IEvent
         {
             var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
@@ -614,17 +554,49 @@ namespace Apache.Ignite.Core.Impl.Events
         }
 
         /// <summary>
+        /// Writes the remote query.
+        /// </summary>
+        /// <param name="filter">The filter.</param>
+        /// <param name="timeout">The timeout.</param>
+        /// <param name="types">The types.</param>
+        /// <param name="writer">The writer.</param>
+        private static void WriteRemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout, int[] types, 
+            IBinaryRawWriter writer)
+            where T : IEvent
+        {
+            writer.WriteObject(filter);
+
+            writer.WriteLong((long)(timeout == null ? 0 : timeout.Value.TotalMilliseconds));
+
+            WriteEventTypes(types, writer);
+        }
+
+        /// <summary>
         /// Local user filter wrapper.
         /// </summary>
-        private class LocalEventFilter : IInteropCallback
+        private class LocalEventFilter<T> : IInteropCallback where T : IEvent
         {
             /** */
-            public Func<IBinaryStream, bool> InvokeFunc;
+            private readonly Marshaller _marshaller;
+
+            /** */
+            private readonly IEventFilter<T> _listener;
+
+            /// <summary>
+            /// Initializes a new instance of the <see cref="LocalEventFilter{T}"/> class.
+            /// </summary>
+            public LocalEventFilter(Marshaller marshaller, IEventFilter<T> listener)
+            {
+                _marshaller = marshaller;
+                _listener = listener;
+            }
 
             /** <inheritdoc /> */
             public int Invoke(IBinaryStream stream)
             {
-                return InvokeFunc(stream) ? 1 : 0;
+                var evt = EventReader.Read<T>(_marshaller.StartUnmarshal(stream));
+
+                return _listener.Invoke(evt) ? 1 : 0;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index a8e3075..79df470 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -485,7 +485,7 @@ namespace Apache.Ignite.Core.Impl
         /// </returns>
         public ICache<TK, TV> Cache<TK, TV>(IUnmanagedTarget nativeCache, bool keepBinary = false)
         {
-            return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false, false);
+            return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false);
         }
 
         /** <inheritdoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 2216d1a..1b43438 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Messaging
     using System.Diagnostics;
     using System.Linq;
     using System.Threading.Tasks;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Collections;
@@ -30,7 +31,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
     using Apache.Ignite.Core.Impl.Resource;
     using Apache.Ignite.Core.Impl.Unmanaged;
     using Apache.Ignite.Core.Messaging;
-    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
 
     /// <summary>
     /// Messaging functionality.
@@ -49,7 +49,8 @@ namespace Apache.Ignite.Core.Impl.Messaging
             SendOrdered = 5,
             StopLocalListen = 6,
             StopRemoteListen = 7,
-            WithAsync = 8
+            RemoteListenAsync = 9,
+            StopRemoteListenAsync = 10
         }
 
         /** Map from user (func+topic) -> id, needed for unsubscription. */
@@ -59,12 +60,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
         /** Grid */
         private readonly Ignite _ignite;
         
-        /** Async instance. */
-        private readonly Lazy<Messaging> _asyncInstance;
-
-        /** Async flag. */
-        private readonly bool _isAsync;
-
         /** Cluster group. */
         private readonly IClusterGroup _clusterGroup;
 
@@ -82,20 +77,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
             _clusterGroup = prj;
 
             _ignite = (Ignite) prj.Ignite;
-
-            _asyncInstance = new Lazy<Messaging>(() => new Messaging(this));
-        }
-
-        /// <summary>
-        /// Initializes a new async instance.
-        /// </summary>
-        /// <param name="messaging">The messaging.</param>
-        private Messaging(Messaging messaging) : base(
-            UU.TargetOutObject(messaging.Target, (int) Op.WithAsync), messaging.Marshaller)
-        {
-            _isAsync = true;
-            _ignite = messaging._ignite;
-            _clusterGroup = messaging.ClusterGroup;
         }
 
         /** <inheritdoc /> */
@@ -104,16 +85,7 @@ namespace Apache.Ignite.Core.Impl.Messaging
             get { return _clusterGroup; }
         }
 
-        /// <summary>
-        /// Gets the asynchronous instance.
-        /// </summary>
-        private Messaging AsyncInstance
-        {
-            get { return _asyncInstance.Value; }
-        }
-
         /** <inheritdoc /> */
-
         public void Send(object message, object topic = null)
         {
             IgniteArgumentCheck.NotNull(message, "message");
@@ -216,65 +188,28 @@ namespace Apache.Ignite.Core.Impl.Messaging
         /** <inheritdoc /> */
         public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null)
         {
-            IgniteArgumentCheck.NotNull(listener, "filter");
-
-            var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
-            var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
-
-            try
-            {
-                Guid id = Guid.Empty;
-
-                DoOutInOp((int) Op.RemoteListen,
-                    writer =>
-                    {
-                        writer.Write(filter0);
-                        writer.WriteLong(filterHnd);
-                        writer.Write(topic);
-                    },
-                    input =>
-                    {
-                        var id0 = Marshaller.StartUnmarshal(input).GetRawReader().ReadGuid();
-
-                        Debug.Assert(_isAsync || id0.HasValue);
-
-                        if (id0.HasValue)
-                            id = id0.Value;
-                    });
-
-                return id;
-            }
-            catch (Exception)
-            {
-                _ignite.HandleRegistry.Release(filterHnd);
-
-                throw;
-            }
+            return RemoteListen(listener, topic,
+                (writeAct, readAct) => DoOutInOp((int) Op.RemoteListen, writeAct,
+                    stream => readAct(Marshaller.StartUnmarshal(stream))));
         }
 
         /** <inheritdoc /> */
         public Task<Guid> RemoteListenAsync<T>(IMessageListener<T> listener, object topic = null)
         {
-            AsyncInstance.RemoteListen(listener, topic);
-
-            return AsyncInstance.GetTask<Guid>();
+            return RemoteListen(listener, topic,
+                (writeAct, readAct) => DoOutOpAsync((int) Op.RemoteListenAsync, writeAct, convertFunc: readAct));
         }
 
         /** <inheritdoc /> */
         public void StopRemoteListen(Guid opId)
         {
-            DoOutOp((int) Op.StopRemoteListen, writer =>
-            {
-                writer.WriteGuid(opId);
-            });
+            DoOutOp((int) Op.StopRemoteListen, writer => writer.WriteGuid(opId));
         }
 
         /** <inheritdoc /> */
         public Task StopRemoteListenAsync(Guid opId)
         {
-            AsyncInstance.StopRemoteListen(opId);
-
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync((int) Op.StopRemoteListenAsync, writer => writer.WriteGuid(opId));
         }
 
         /// <summary>
@@ -287,5 +222,33 @@ namespace Apache.Ignite.Core.Impl.Messaging
         {
             return new KeyValuePair<object, object>(filter, topic);
         }
+
+        /// <summary>
+        /// Remotes listen.
+        /// </summary>
+        private TRes RemoteListen<T, TRes>(IMessageListener<T> filter, object topic,
+            Func<Action<IBinaryRawWriter>, Func<BinaryReader, Guid>, TRes> invoker)
+        {
+            IgniteArgumentCheck.NotNull(filter, "filter");
+
+            var filter0 = MessageListenerHolder.CreateLocal(_ignite, filter);
+            var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
+
+            try
+            {
+                return invoker(writer =>
+                {
+                    writer.WriteObject(filter0);
+                    writer.WriteLong(filterHnd);
+                    writer.WriteObject(topic);
+                }, input => input.ReadGuid() ?? Guid.Empty);
+            }
+            catch (Exception)
+            {
+                _ignite.HandleRegistry.Release(filterHnd);
+
+                throw;
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f392830..d5b69a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
     using System.Threading.Tasks;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Binary.Metadata;
@@ -757,6 +758,102 @@ namespace Apache.Ignite.Core.Impl
 
         #endregion
 
+        #region Async operations
+
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        /// <param name="type">The type code.</param>
+        /// <param name="writeAction">The write action.</param>
+        /// <returns>Task for async operation</returns>
+        protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null)
+        {
+            return DoOutOpAsync<object>(type, writeAction);
+        }
+
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        /// <typeparam name="T">Type of the result.</typeparam>
+        /// <param name="type">The type code.</param>
+        /// <param name="writeAction">The write action.</param>
+        /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+        /// <param name="convertFunc">The function to read future result from stream.</param>
+        /// <returns>Task for async operation</returns>
+        protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false,
+            Func<BinaryReader, T> convertFunc = null)
+        {
+            return GetFuture((futId, futType) => DoOutOp(type, w =>
+            {
+                if (writeAction != null)
+                    writeAction(w);
+                w.WriteLong(futId);
+                w.WriteInt(futType);
+            }), keepBinary, convertFunc).Task;
+        }
+
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        /// <typeparam name="T">Type of the result.</typeparam>
+        /// <param name="type">The type code.</param>
+        /// <param name="writeAction">The write action.</param>
+        /// <returns>Future for async operation</returns>
+        protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction)
+        {
+            return GetFuture<T>((futId, futType) => DoOutOpObject(type, w =>
+            {
+                writeAction(w);
+                w.WriteLong(futId);
+                w.WriteInt(futType);
+            }));
+        }
+
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        /// <typeparam name="TR">Type of the result.</typeparam>
+        /// <typeparam name="T1">The type of the first arg.</typeparam>
+        /// <param name="type">The type code.</param>
+        /// <param name="val1">First arg.</param>
+        /// <returns>
+        /// Task for async operation
+        /// </returns>
+        protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1)
+        {
+            return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+            {
+                w.WriteObject(val1);
+                w.WriteLong(futId);
+                w.WriteInt(futType);
+            })).Task;
+        }
+
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        /// <typeparam name="TR">Type of the result.</typeparam>
+        /// <typeparam name="T1">The type of the first arg.</typeparam>
+        /// <typeparam name="T2">The type of the second arg.</typeparam>
+        /// <param name="type">The type code.</param>
+        /// <param name="val1">First arg.</param>
+        /// <param name="val2">Second arg.</param>
+        /// <returns>
+        /// Task for async operation
+        /// </returns>
+        protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2)
+        {
+            return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+            {
+                w.WriteObject(val1);
+                w.WriteObject(val2);
+                w.WriteLong(futId);
+                w.WriteInt(futType);
+            })).Task;
+        }
+
+        #endregion
+
         #region Miscelanneous
 
         /// <summary>
@@ -846,7 +943,7 @@ namespace Apache.Ignite.Core.Impl
         /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
         /// <param name="convertFunc">The function to read future result from stream.</param>
         /// <returns>Created future.</returns>
-        protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
+        private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
             Func<BinaryReader, T> convertFunc = null)
         {
             var futType = FutureType.Object;
@@ -862,7 +959,18 @@ namespace Apache.Ignite.Core.Impl
 
             var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
 
-            var futTarget = listenAction(futHnd, (int) futType);
+            IUnmanagedTarget futTarget;
+
+            try
+            {
+                futTarget = listenAction(futHnd, (int)futType);
+            }
+            catch (Exception)
+            {
+                _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+                throw;
+            }
 
             fut.SetTarget(futTarget);
 
@@ -893,25 +1001,18 @@ namespace Apache.Ignite.Core.Impl
 
             var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
 
-            listenAction(futHnd, (int)futType);
-
-            return fut;
-        }
+            try
+            {
+                listenAction(futHnd, (int)futType);
+            }
+            catch (Exception)
+            {
+                _marsh.Ignite.HandleRegistry.Release(futHnd);
 
-        /// <summary>
-        /// Creates a task to listen for the last async op.
-        /// </summary>
-        protected Task GetTask()
-        {
-            return GetTask<object>();
-        }
+                throw;
+            }
 
-        /// <summary>
-        /// Creates a task to listen for the last async op.
-        /// </summary>
-        protected Task<T> GetTask<T>()
-        {
-            return GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp)).Task;
+            return fut;
         }
 
         #endregion

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
index 8fc973b..9d9acfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
@@ -17,12 +17,12 @@
 
 namespace Apache.Ignite.Core.Impl.Services
 {
-    using System;
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Linq;
     using System.Reflection;
     using System.Threading.Tasks;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Common;
@@ -51,9 +51,6 @@ namespace Apache.Ignite.Core.Impl.Services
         private const int OpDescriptors = 5;
 
         /** */
-        private const int OpWithAsync = 6;
-
-        /** */
         private const int OpWithServerKeepBinary = 7;
 
         /** */
@@ -66,6 +63,18 @@ namespace Apache.Ignite.Core.Impl.Services
         private const int OpCancelAll = 10;
 
         /** */
+        private const int OpDeployAsync = 11;
+
+        /** */
+        private const int OpDeployMultipleAsync = 12;
+
+        /** */
+        private const int OpCancelAsync = 13;
+
+        /** */
+        private const int OpCancelAllAsync = 14;
+
+        /** */
         private readonly IClusterGroup _clusterGroup;
 
         /** Invoker binary flag. */
@@ -74,9 +83,6 @@ namespace Apache.Ignite.Core.Impl.Services
         /** Server binary flag. */
         private readonly bool _srvKeepBinary;
 
-        /** Async instance. */
-        private readonly Lazy<Services> _asyncInstance;
-
         /// <summary>
         /// Initializes a new instance of the <see cref="Services" /> class.
         /// </summary>
@@ -94,20 +100,6 @@ namespace Apache.Ignite.Core.Impl.Services
             _clusterGroup = clusterGroup;
             _keepBinary = keepBinary;
             _srvKeepBinary = srvKeepBinary;
-
-            _asyncInstance = new Lazy<Services>(() => new Services(this));
-        }
-
-        /// <summary>
-        /// Initializes a new async instance.
-        /// </summary>
-        /// <param name="services">The services.</param>
-        private Services(Services services) : base(UU.TargetOutObject(services.Target, OpWithAsync), 
-            services.Marshaller)
-        {
-            _clusterGroup = services.ClusterGroup;
-            _keepBinary = services._keepBinary;
-            _srvKeepBinary = services._srvKeepBinary;
         }
 
         /** <inheritDoc /> */
@@ -134,14 +126,6 @@ namespace Apache.Ignite.Core.Impl.Services
             get { return _clusterGroup; }
         }
 
-        /// <summary>
-        /// Gets the asynchronous instance.
-        /// </summary>
-        private Services AsyncInstance
-        {
-            get { return _asyncInstance.Value; }
-        }
-
         /** <inheritDoc /> */
         public void DeployClusterSingleton(string name, IService service)
         {
@@ -154,9 +138,10 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task DeployClusterSingletonAsync(string name, IService service)
         {
-            AsyncInstance.DeployClusterSingleton(name, service);
+            IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+            IgniteArgumentCheck.NotNull(service, "service");
 
-            return AsyncInstance.GetTask();
+            return DeployMultipleAsync(name, service, 1, 1);
         }
 
         /** <inheritDoc /> */
@@ -171,9 +156,10 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task DeployNodeSingletonAsync(string name, IService service)
         {
-            AsyncInstance.DeployNodeSingleton(name, service);
+            IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+            IgniteArgumentCheck.NotNull(service, "service");
 
-            return AsyncInstance.GetTask();
+            return DeployMultipleAsync(name, service, 0, 1);
         }
 
         /** <inheritDoc /> */
@@ -197,9 +183,19 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task DeployKeyAffinitySingletonAsync<TK>(string name, IService service, string cacheName, TK affinityKey)
         {
-            AsyncInstance.DeployKeyAffinitySingleton(name, service, cacheName, affinityKey);
+            IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+            IgniteArgumentCheck.NotNull(service, "service");
+            IgniteArgumentCheck.NotNull(affinityKey, "affinityKey");
 
-            return AsyncInstance.GetTask();
+            return DeployAsync(new ServiceConfiguration
+            {
+                Name = name,
+                Service = service,
+                CacheName = cacheName,
+                AffinityKey = affinityKey,
+                TotalCount = 1,
+                MaxPerNodeCount = 1
+            });
         }
 
         /** <inheritDoc /> */
@@ -220,9 +216,16 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task DeployMultipleAsync(string name, IService service, int totalCount, int maxPerNodeCount)
         {
-            AsyncInstance.DeployMultiple(name, service, totalCount, maxPerNodeCount);
+            IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+            IgniteArgumentCheck.NotNull(service, "service");
 
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(OpDeployMultipleAsync, w =>
+            {
+                w.WriteString(name);
+                w.WriteObject(service);
+                w.WriteInt(totalCount);
+                w.WriteInt(maxPerNodeCount);
+            });
         }
 
         /** <inheritDoc /> */
@@ -230,28 +233,15 @@ namespace Apache.Ignite.Core.Impl.Services
         {
             IgniteArgumentCheck.NotNull(configuration, "configuration");
 
-            DoOutOp(OpDeploy, w =>
-            {
-                w.WriteString(configuration.Name);
-                w.WriteObject(configuration.Service);
-                w.WriteInt(configuration.TotalCount);
-                w.WriteInt(configuration.MaxPerNodeCount);
-                w.WriteString(configuration.CacheName);
-                w.WriteObject(configuration.AffinityKey);
-
-                if (configuration.NodeFilter != null)
-                    w.WriteObject(configuration.NodeFilter);
-                else
-                    w.WriteObject<object>(null);
-            });
+            DoOutOp(OpDeploy, w => WriteServiceConfiguration(configuration, w));
         }
 
         /** <inheritDoc /> */
         public Task DeployAsync(ServiceConfiguration configuration)
         {
-            AsyncInstance.Deploy(configuration);
+            IgniteArgumentCheck.NotNull(configuration, "configuration");
 
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(OpDeployAsync, w => WriteServiceConfiguration(configuration, w));
         }
 
         /** <inheritDoc /> */
@@ -265,9 +255,9 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task CancelAsync(string name)
         {
-            AsyncInstance.Cancel(name);
+            IgniteArgumentCheck.NotNullOrEmpty(name, "name");
 
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(OpCancelAsync, w => w.WriteString(name));
         }
 
         /** <inheritDoc /> */
@@ -279,9 +269,7 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public Task CancelAllAsync()
         {
-            AsyncInstance.CancelAll();
-
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(OpCancelAllAsync);
         }
 
         /** <inheritDoc /> */
@@ -391,5 +379,26 @@ namespace Apache.Ignite.Core.Impl.Services
                 writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args, platform),
                 stream => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary), proxy.Target);
         }
+
+        /// <summary>
+        /// Writes the service configuration.
+        /// </summary>
+        private static void WriteServiceConfiguration(ServiceConfiguration configuration, IBinaryRawWriter w)
+        {
+            Debug.Assert(configuration != null);
+            Debug.Assert(w != null);
+
+            w.WriteString(configuration.Name);
+            w.WriteObject(configuration.Service);
+            w.WriteInt(configuration.TotalCount);
+            w.WriteInt(configuration.MaxPerNodeCount);
+            w.WriteString(configuration.CacheName);
+            w.WriteObject(configuration.AffinityKey);
+
+            if (configuration.NodeFilter != null)
+                w.WriteObject(configuration.NodeFilter);
+            else
+                w.WriteObject<object>(null);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
index 796044d..7de9be1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Transactions
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Impl.Binary;
-    using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Unmanaged;
     using Apache.Ignite.Core.Transactions;
 
@@ -214,11 +213,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
         /// </summary>
         internal Task CommitAsync(TransactionImpl tx)
         {
-            return GetFuture<object>((futId, futTyp) => DoOutOp(OpCommitAsync, (IBinaryStream s) =>
-            {
-                s.WriteLong(tx.Id);
-                s.WriteLong(futId);
-            })).Task;
+            return DoOutOpAsync(OpCommitAsync, w => w.WriteLong(tx.Id));
         }
 
         /// <summary>
@@ -226,11 +221,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
         /// </summary>
         internal Task RollbackAsync(TransactionImpl tx)
         {
-            return GetFuture<object>((futId, futTyp) => DoOutOp(OpRollbackAsync, (IBinaryStream s) =>
-            {
-                s.WriteLong(tx.Id);
-                s.WriteLong(futId);
-            })).Task;
+            return DoOutOpAsync(OpRollbackAsync, w => w.WriteLong(tx.Id));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index c352f0c..6b867de 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -150,18 +150,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")]
         public static extern void* TargetOutObject(void* ctx, void* target, int opType);
 
-        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFuture")]
-        public static extern void TargetListenFut(void* ctx, void* target, long futId, int typ);
-
-        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")]
-        public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId);
-
-        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")]
-        public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ);
-
-        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")]
-        public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId);
-
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")]
         public static extern void* Acquire(void* ctx, void* target);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index b1d4ecd..36dc332 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -460,31 +460,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             return target.ChangeTarget(res);
         }
 
-        internal static void TargetListenFuture(IUnmanagedTarget target, long futId, int typ)
-        {
-            JNI.TargetListenFut(target.Context, target.Target, futId, typ);
-        }
-
-        internal static void TargetListenFutureForOperation(IUnmanagedTarget target, long futId, int typ, int opId)
-        {
-            JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId);
-        }
-
-        internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ)
-        {
-            var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ);
-
-            return target.ChangeTarget(res);
-        }
-
-        internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId,
-            int typ, int opId)
-        {
-            var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId);
-
-            return target.ChangeTarget(res);
-        }
-
         #endregion
 
         #region NATIVE METHODS: MISCELANNEOUS

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index ac065bc..72ce015 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -1,5 +1,6 @@
 \ufeff<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
 	<s:String x:Key="/Default/CodeInspection/CSharpLanguageProject/LanguageLevel/@EntryValue">CSharp50</s:String>
-	<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>	
+	<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
 	<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean>
+	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 </wpf:ResourceDictionary>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
index cf9e287..9672abe 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
@@ -25,6 +25,7 @@
 	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticMemberInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticFieldInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VirtualMemberNeverOverriden_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
+	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 	
 	<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
 	<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file