You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/15 11:20:16 UTC

ignite git commit: IGNITE-1662: Renamed IMessageFilter to IMessageListener.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1282 81feb9596 -> 2a77dd3a7


IGNITE-1662: Renamed IMessageFilter to IMessageListener.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a77dd3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a77dd3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a77dd3a

Branch: refs/heads/ignite-1282
Commit: 2a77dd3a7f01208d7172b66f6520cfc0e6615570
Parents: 81feb95
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Oct 15 12:20:58 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 12:20:58 2015 +0300

----------------------------------------------------------------------
 .../Apache.Ignite.Core.Tests/EventsTest.cs      |   2 +-
 .../IgniteStartStopTest.cs                      |   4 +-
 .../Apache.Ignite.Core.Tests/MessagingTest.cs   |  16 +-
 .../Apache.Ignite.Core.csproj                   |   4 +-
 .../dotnet/Apache.Ignite.Core/Events/IEvents.cs |   2 +-
 .../Impl/Common/DelegateTypeDescriptor.cs       |  16 +-
 .../Apache.Ignite.Core/Impl/Events/Events.cs    |   2 +-
 .../Impl/Messaging/MessageFilterHolder.cs       | 177 -------------------
 .../Impl/Messaging/MessageListenerHolder.cs     | 177 +++++++++++++++++++
 .../Impl/Messaging/Messaging.cs                 |  22 +--
 .../Impl/Portable/PortableMarshaller.cs         |   2 +-
 .../Impl/Portable/PortableUtils.cs              |   2 +-
 .../Impl/Unmanaged/UnmanagedCallbacks.cs        |   4 +-
 .../Messaging/IMessageFilter.cs                 |  35 ----
 .../Messaging/IMessageListener.cs               |  38 ++++
 .../Apache.Ignite.Core/Messaging/IMessaging.cs  |  15 +-
 .../Events/LocalListener.cs                     |   2 +-
 .../Messaging/LocalListener.cs                  |   2 +-
 .../Messaging/RemoteOrderedListener.cs          |   2 +-
 .../Messaging/RemoteUnorderedListener.cs        |   2 +-
 20 files changed, 266 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
index c271aa6..b325d36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
@@ -383,7 +383,7 @@ namespace Apache.Ignite.Core.Tests
             var expectedType = EventType.JobStarted;
 
             var remoteFilter = portable 
-                ?  (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType) 
+                ?  (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
                 :  new RemoteEventFilter(expectedType);
 
             var localListener = EventsTestHelper.GetListener();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index bd776ce..d16063f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -384,7 +384,7 @@ namespace Apache.Ignite.Core.Tests
             // to test race conditions during processor init on remote node
             var listenTask = Task.Factory.StartNew(() =>
             {
-                var filter = new MessageFilter();
+                var filter = new MessageListener();
 
                 while (!token.IsCancellationRequested)
                 {
@@ -410,7 +410,7 @@ namespace Apache.Ignite.Core.Tests
         /// Noop message filter.
         /// </summary>
         [Serializable]
-        private class MessageFilter : IMessageFilter<int>
+        private class MessageListener : IMessageListener<int>
         {
             /** <inheritdoc /> */
             public bool Invoke(Guid nodeId, int message)

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
index 95e48d3..55f2e6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
@@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests
         {
             var grid3GotMessage = false;
 
-            var grid3Listener = new MessageFilter<string>((id, x) =>
+            var grid3Listener = new MessageListener<string>((id, x) =>
             {
                 grid3GotMessage = true;
                 return true;
@@ -199,7 +199,7 @@ namespace Apache.Ignite.Core.Tests
 
             var sharedReceived = 0;
 
-            var sharedListener = new MessageFilter<string>((id, x) =>
+            var sharedListener = new MessageListener<string>((id, x) =>
             {
                 Interlocked.Increment(ref sharedReceived);
                 Thread.MemoryBarrier();
@@ -220,7 +220,7 @@ namespace Apache.Ignite.Core.Tests
                 var localReceived = 0;
                 var stopLocal = 0;
 
-                var localListener = new MessageFilter<string>((id, x) =>
+                var localListener = new MessageListener<string>((id, x) =>
                 {
                     Interlocked.Increment(ref localReceived);
                     Thread.MemoryBarrier();
@@ -569,9 +569,9 @@ namespace Apache.Ignite.Core.Tests
         /// Gets the message listener.
         /// </summary>
         /// <returns>New instance of message listener.</returns>
-        public static IMessageFilter<string> GetListener()
+        public static IMessageListener<string> GetListener()
         {
-            return new MessageFilter<string>(Listen);
+            return new MessageListener<string>(Listen);
         }
 
         /// <summary>
@@ -616,7 +616,7 @@ namespace Apache.Ignite.Core.Tests
     /// Test message filter.
     /// </summary>
     [Serializable]
-    public class MessageFilter<T> : IMessageFilter<T>
+    public class MessageListener<T> : IMessageListener<T>
     {
         /** */
         private readonly Func<Guid, T, bool> _invoke;
@@ -628,10 +628,10 @@ namespace Apache.Ignite.Core.Tests
         #pragma warning restore 649
 
         /// <summary>
-        /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class.
+        /// Initializes a new instance of the <see cref="MessageListener{T}"/> class.
         /// </summary>
         /// <param name="invoke">The invoke delegate.</param>
-        public MessageFilter(Func<Guid, T, bool> invoke)
+        public MessageListener(Func<Guid, T, bool> invoke)
         {
             _invoke = invoke;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 848ce49..a10a0a5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -238,7 +238,7 @@
     <Compile Include="Impl\Memory\PlatformPooledMemory.cs" />
     <Compile Include="Impl\Memory\PlatformRawMemory.cs" />
     <Compile Include="Impl\Memory\PlatformUnpooledMemory.cs" />
-    <Compile Include="Impl\Messaging\MessageFilterHolder.cs" />
+    <Compile Include="Impl\Messaging\MessageListenerHolder.cs" />
     <Compile Include="Impl\Messaging\Messaging.cs" />
     <Compile Include="Impl\Messaging\MessagingAsync.cs" />
     <Compile Include="Impl\NativeMethods.cs" />
@@ -307,7 +307,7 @@
     <Compile Include="Impl\Unmanaged\UnmanagedUtils.cs" />
     <Compile Include="Lifecycle\ILifecycleBean.cs" />
     <Compile Include="Lifecycle\LifecycleEventType.cs" />
-    <Compile Include="Messaging\IMessageFilter.cs" />
+    <Compile Include="Messaging\IMessageListener.cs" />
     <Compile Include="Messaging\IMessaging.cs" />
     <Compile Include="Portable\IPortableBuilder.cs" />
     <Compile Include="Portable\IPortableIdMapper.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
index be38104..b2f07d4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Events
         /// </returns>
         [AsyncSupported]
         Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
-            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) 
+            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
             where T : IEvent;
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 8b97884..0f2b3c1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -58,7 +58,7 @@ namespace Apache.Ignite.Core.Impl.Common
             _cacheEntryProcessor;
 
         /** */
-        private readonly Func<object, Guid, object, bool> _messageFilter;
+        private readonly Func<object, Guid, object, bool> _messageLsnr;
 
         /** */
         private readonly Func<object, object> _computeJobExecute;
@@ -136,13 +136,13 @@ namespace Apache.Ignite.Core.Impl.Common
         }
 
         /// <summary>
-        /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+        /// Gets the <see cref="IMessageListener{T}" /> invocator.
         /// </summary>
         /// <param name="type">Type.</param>
         /// <returns>Precompiled invocator delegate.</returns>
-        public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+        public static Func<object, Guid, object, bool> GetMessageListener(Type type)
         {
-            return Get(type)._messageFilter;
+            return Get(type)._messageLsnr;
         }
 
         /// <summary>
@@ -286,18 +286,18 @@ namespace Apache.Ignite.Core.Impl.Common
                     _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
                         new[] {iface});
                 }
-                else if (genericTypeDefinition == typeof (IMessageFilter<>))
+                else if (genericTypeDefinition == typeof (IMessageListener<>))
                 {
-                    ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+                    ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IMessageListener<>));
 
                     var arg = iface.GetGenericArguments()[0];
 
-                    _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+                    _messageLsnr = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
                         new[] { typeof(Guid), arg }, new[] { false, true, false });
                 }
                 else if (genericTypeDefinition == typeof (IComputeJob<>))
                 {
-                    ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+                    ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IComputeJob<>));
 
                     _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0], 
                         methodName: "Execute");

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index f4cc341..08936e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -164,7 +164,7 @@ namespace Apache.Ignite.Core.Impl.Events
 
         /** <inheritDoc /> */
         public Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
-            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null) 
+            IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
             where T : IEvent
         {
             return RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, TypesToArray(types));

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
deleted file mode 100644
index 8666e9b..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Messaging
-{
-    using System;
-    using System.Diagnostics;
-    using Apache.Ignite.Core.Impl.Common;
-    using Apache.Ignite.Core.Impl.Handle;
-    using Apache.Ignite.Core.Impl.Portable;
-    using Apache.Ignite.Core.Impl.Portable.IO;
-    using Apache.Ignite.Core.Impl.Resource;
-    using Apache.Ignite.Core.Messaging;
-    using Apache.Ignite.Core.Portable;
-
-    /// <summary>
-    /// Non-generic portable filter wrapper.
-    /// </summary>
-    internal class MessageFilterHolder : IPortableWriteAware, IHandle
-    {
-        /** Invoker function that takes key and value and invokes wrapped IMessageFilter */
-        private readonly Func<Guid, object, bool> _invoker;
-
-        /** Current Ignite instance. */
-        private readonly Ignite _ignite;
-        
-        /** Underlying filter. */
-        private readonly object _filter;
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="MessageFilterHolder" /> class.
-        /// </summary>
-        /// <param name="grid">Grid.</param>
-        /// <param name="filter">The <see cref="IMessageFilter{T}" /> to wrap.</param>
-        /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageFilter.</param>
-        private MessageFilterHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
-        {
-            Debug.Assert(filter != null);
-            Debug.Assert(invoker != null);
-
-            _invoker = invoker;
-
-            _filter = filter;
-
-            // 1. Set fields.
-            Debug.Assert(grid != null);
-
-            _ignite = grid;
-            _invoker = invoker;
-
-            // 2. Perform injections.
-            ResourceProcessor.Inject(filter, grid);
-        }
-
-        /// <summary>
-        /// Invoke the filter.
-        /// </summary>
-        /// <param name="input">Input.</param>
-        /// <returns></returns>
-        public int Invoke(IPortableStream input)
-        {
-            var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
-
-            var nodeId = rawReader.ReadGuid();
-
-            Debug.Assert(nodeId != null);
-
-            return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
-        }
-
-        /// <summary>
-        /// Wrapped <see cref="IMessageFilter{T}" />.
-        /// </summary>
-        public object Filter
-        {
-            get { return _filter; }
-        }
-
-        /// <summary>
-        /// Destroy callback.
-        /// </summary>
-        public Action DestroyAction { private get; set; }
-
-        /** <inheritDoc /> */
-        public void Release()
-        {
-            if (DestroyAction != null)
-                DestroyAction();
-        }
-
-        /** <inheritDoc /> */
-        public bool Released
-        {
-            get { return false; } // Multiple releases are allowed.
-        }
-
-        /// <summary>
-        /// Creates local holder instance.
-        /// </summary>
-        /// <param name="grid">Ignite instance.</param>
-        /// <param name="filter">Filter.</param>
-        /// <returns>
-        /// New instance of <see cref="MessageFilterHolder" />
-        /// </returns>
-        public static MessageFilterHolder CreateLocal<T>(Ignite grid, IMessageFilter<T> filter)
-        {
-            Debug.Assert(filter != null);
-
-            return new MessageFilterHolder(grid, filter, (id, msg) => filter.Invoke(id, (T)msg));
-        }
-
-        /// <summary>
-        /// Creates remote holder instance.
-        /// </summary>
-        /// <param name="grid">Grid.</param>
-        /// <param name="memPtr">Memory pointer.</param>
-        /// <returns>Deserialized instance of <see cref="MessageFilterHolder"/></returns>
-        public static MessageFilterHolder CreateRemote(Ignite grid, long memPtr)
-        {
-            Debug.Assert(grid != null);
-
-            using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
-            {
-                return grid.Marshaller.Unmarshal<MessageFilterHolder>(stream);
-            }
-        }
-
-        /// <summary>
-        /// Gets the invoker func.
-        /// </summary>
-        private static Func<Guid, object, bool> GetInvoker(object pred)
-        {
-            var func = DelegateTypeDescriptor.GetMessageFilter(pred.GetType());
-
-            return (id, msg) => func(pred, id, msg);
-        }
-
-        /** <inheritdoc /> */
-        public void WritePortable(IPortableWriter writer)
-        {
-            var writer0 = (PortableWriterImpl)writer.GetRawWriter();
-
-            writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
-        }
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="MessageFilterHolder"/> class.
-        /// </summary>
-        /// <param name="reader">The reader.</param>
-        public MessageFilterHolder(IPortableReader reader)
-        {
-            var reader0 = (PortableReaderImpl)reader.GetRawReader();
-
-            _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
-            _invoker = GetInvoker(_filter);
-
-            _ignite = reader0.Marshaller.Ignite;
-
-            ResourceProcessor.Inject(_filter, _ignite);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
new file mode 100644
index 0000000..412a84e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Messaging
+{
+    using System;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Handle;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Messaging;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// Non-generic portable message listener wrapper.
+    /// </summary>
+    internal class MessageListenerHolder : IPortableWriteAware, IHandle
+    {
+        /** Invoker function that takes key and value and invokes wrapped IMessageListener */
+        private readonly Func<Guid, object, bool> _invoker;
+
+        /** Current Ignite instance. */
+        private readonly Ignite _ignite;
+        
+        /** Underlying filter. */
+        private readonly object _filter;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="MessageListenerHolder" /> class.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="filter">The <see cref="IMessageListener{T}" /> to wrap.</param>
+        /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageListener.</param>
+        private MessageListenerHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
+        {
+            Debug.Assert(filter != null);
+            Debug.Assert(invoker != null);
+
+            _invoker = invoker;
+
+            _filter = filter;
+
+            // 1. Set fields.
+            Debug.Assert(grid != null);
+
+            _ignite = grid;
+            _invoker = invoker;
+
+            // 2. Perform injections.
+            ResourceProcessor.Inject(filter, grid);
+        }
+
+        /// <summary>
+        /// Invoke the filter.
+        /// </summary>
+        /// <param name="input">Input.</param>
+        /// <returns></returns>
+        public int Invoke(IPortableStream input)
+        {
+            var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
+
+            var nodeId = rawReader.ReadGuid();
+
+            Debug.Assert(nodeId != null);
+
+            return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
+        }
+
+        /// <summary>
+        /// Wrapped <see cref="IMessageListener{T}" />.
+        /// </summary>
+        public object Filter
+        {
+            get { return _filter; }
+        }
+
+        /// <summary>
+        /// Destroy callback.
+        /// </summary>
+        public Action DestroyAction { private get; set; }
+
+        /** <inheritDoc /> */
+        public void Release()
+        {
+            if (DestroyAction != null)
+                DestroyAction();
+        }
+
+        /** <inheritDoc /> */
+        public bool Released
+        {
+            get { return false; } // Multiple releases are allowed.
+        }
+
+        /// <summary>
+        /// Creates local holder instance.
+        /// </summary>
+        /// <param name="grid">Ignite instance.</param>
+        /// <param name="listener">Filter.</param>
+        /// <returns>
+        /// New instance of <see cref="MessageListenerHolder" />
+        /// </returns>
+        public static MessageListenerHolder CreateLocal<T>(Ignite grid, IMessageListener<T> listener)
+        {
+            Debug.Assert(listener != null);
+
+            return new MessageListenerHolder(grid, listener, (id, msg) => listener.Invoke(id, (T)msg));
+        }
+
+        /// <summary>
+        /// Creates remote holder instance.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="memPtr">Memory pointer.</param>
+        /// <returns>Deserialized instance of <see cref="MessageListenerHolder"/></returns>
+        public static MessageListenerHolder CreateRemote(Ignite grid, long memPtr)
+        {
+            Debug.Assert(grid != null);
+
+            using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+            {
+                return grid.Marshaller.Unmarshal<MessageListenerHolder>(stream);
+            }
+        }
+
+        /// <summary>
+        /// Gets the invoker func.
+        /// </summary>
+        private static Func<Guid, object, bool> GetInvoker(object pred)
+        {
+            var func = DelegateTypeDescriptor.GetMessageListener(pred.GetType());
+
+            return (id, msg) => func(pred, id, msg);
+        }
+
+        /** <inheritdoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl)writer.GetRawWriter();
+
+            writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="MessageListenerHolder"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public MessageListenerHolder(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.GetRawReader();
+
+            _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+            _invoker = GetInvoker(_filter);
+
+            _ignite = reader0.Marshaller.Ignite;
+
+            ResourceProcessor.Inject(_filter, _ignite);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 8170a91..4ccbc3e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -113,17 +113,17 @@ namespace Apache.Ignite.Core.Impl.Messaging
         }
 
         /** <inheritdoc /> */
-        public void LocalListen<T>(IMessageFilter<T> filter, object topic = null)
+        public void LocalListen<T>(IMessageListener<T> listener, object topic = null)
         {
-            IgniteArgumentCheck.NotNull(filter, "filter");
+            IgniteArgumentCheck.NotNull(listener, "filter");
 
-            ResourceProcessor.Inject(filter, _ignite);
+            ResourceProcessor.Inject(listener, _ignite);
 
             lock (_funcMap)
             {
-                var key = GetKey(filter, topic);
+                var key = GetKey(listener, topic);
 
-                MessageFilterHolder filter0 = MessageFilterHolder.CreateLocal(_ignite, filter); 
+                MessageListenerHolder filter0 = MessageListenerHolder.CreateLocal(_ignite, listener); 
 
                 var filterHnd = _ignite.HandleRegistry.Allocate(filter0);
 
@@ -155,16 +155,16 @@ namespace Apache.Ignite.Core.Impl.Messaging
         }
 
         /** <inheritdoc /> */
-        public void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null)
+        public void StopLocalListen<T>(IMessageListener<T> listener, object topic = null)
         {
-            IgniteArgumentCheck.NotNull(filter, "filter");
+            IgniteArgumentCheck.NotNull(listener, "filter");
 
             long filterHnd;
             bool removed;
 
             lock (_funcMap)
             {
-                removed = _funcMap.TryRemove(GetKey(filter, topic), out filterHnd);
+                removed = _funcMap.TryRemove(GetKey(listener, topic), out filterHnd);
             }
 
             if (removed)
@@ -178,11 +178,11 @@ namespace Apache.Ignite.Core.Impl.Messaging
         }
 
         /** <inheritdoc /> */
-        public Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null)
+        public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null)
         {
-            IgniteArgumentCheck.NotNull(filter, "filter");
+            IgniteArgumentCheck.NotNull(listener, "filter");
 
-            var filter0 = MessageFilterHolder.CreateLocal(_ignite, filter);
+            var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
             var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
 
             try

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
index 67d8f2b..6499946 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
@@ -516,7 +516,7 @@ namespace Apache.Ignite.Core.Impl.Portable
             AddSystemType(PortableUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w));
             AddSystemType(PortableUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w));
             AddSystemType(PortableUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w));
-            AddSystemType(PortableUtils.TypeMessageFilterHolder, w => new MessageFilterHolder(w));
+            AddSystemType(PortableUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w));
             AddSystemType(PortableUtils.TypePortableOrSerializableHolder, w => new PortableOrSerializableObjectHolder(w));
             AddSystemType(PortableUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
index f80a199..c7be496 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
@@ -200,7 +200,7 @@ namespace Apache.Ignite.Core.Impl.Portable
         public const byte TypeProductLicense = 78;
 
         /** Type: message filter holder. */
-        public const byte TypeMessageFilterHolder = 92;
+        public const byte TypeMessageListenerHolder = 92;
 
         /** Type: message filter holder. */
         public const byte TypePortableOrSerializableHolder = 93;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index f9949f3..3295904 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -788,7 +788,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         {
             return SafeCall(() =>
             {
-                MessageFilterHolder holder = MessageFilterHolder.CreateRemote(_ignite, memPtr);
+                MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr);
 
                 return _ignite.HandleRegistry.AllocateSafe(holder);
             });
@@ -798,7 +798,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         {
             return SafeCall(() =>
             {
-                var holder = _ignite.HandleRegistry.Get<MessageFilterHolder>(ptr, false);
+                var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
                 
                 if (holder == null)
                     return 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
deleted file mode 100644
index 456c5e6..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Messaging
-{
-    using System;
-
-    /// <summary>
-    /// Represents messaging filter predicate.
-    /// </summary>
-    public interface IMessageFilter<in T>
-    {
-        /// <summary>
-        /// Returns a value indicating whether provided message and node id satisfy this predicate.
-        /// </summary>
-        /// <param name="nodeId">Node identifier.</param>
-        /// <param name="message">Message.</param>
-        /// <returns>Value indicating whether provided message and node id satisfy this predicate.</returns>
-        bool Invoke(Guid nodeId, T message);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
new file mode 100644
index 0000000..393a670
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Messaging
+{
+    using System;
+
+    /// <summary>
+    /// Represents messaging filter predicate.
+    /// </summary>
+    public interface IMessageListener<in T>
+    {
+        /// <summary>
+        /// Invokes the message listener when a message arrives.
+        /// </summary>
+        /// <param name="nodeId">Message source node identifier.</param>
+        /// <param name="message">Message.</param>
+        /// <returns>
+        /// Value indicating whether this instance should remain subscribed. 
+        /// Returning <c>false</c> will unsubscribe this message listener from further notifications.
+        /// </returns>
+        bool Invoke(Guid nodeId, T message);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
index 96f46b9..f846745 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
@@ -67,19 +67,19 @@ namespace Apache.Ignite.Core.Messaging
         /// node within the cluster group will send a message for a given topic to this node. Local listen
         /// subscription will happen regardless of whether local node belongs to this cluster group or not.
         /// </summary>
-        /// <param name="filter">
+        /// <param name="listener">
         /// Predicate that is called on each received message. If predicate returns false,
         /// then it will be unsubscribed from any further notifications.
         /// </param>
         /// <param name="topic">Topic to subscribe to.</param>
-        void LocalListen<T>(IMessageFilter<T> filter, object topic = null);
+        void LocalListen<T>(IMessageListener<T> listener, object topic = null);
 
         /// <summary>
         /// Unregisters local listener for given topic on local node only.
         /// </summary>
-        /// <param name="filter">Listener predicate.</param>
+        /// <param name="listener">Listener predicate.</param>
         /// <param name="topic">Topic to unsubscribe from.</param>
-        void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null);
+        void StopLocalListen<T>(IMessageListener<T> listener, object topic = null);
 
         /// <summary>
         /// Adds a message listener for a given topic to all nodes in the cluster group (possibly including
@@ -87,13 +87,16 @@ namespace Apache.Ignite.Core.Messaging
         /// group can send a message for a given topic and all nodes within the cluster group will receive
         /// listener notifications.
         /// </summary>
-        /// <param name="filter">Listener predicate.</param>
+        /// <param name="listener">
+        /// Predicate that is called on each received message. If predicate returns false,
+        /// then it will be unsubscribed from any further notifications.
+        /// </param>
         /// <param name="topic">Topic to unsubscribe from.</param>
         /// <returns>
         /// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening.
         /// </returns>
         [AsyncSupported]
-        Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null);
+        Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null);
 
         /// <summary>
         /// Unregisters all listeners identified with provided operation ID on all nodes in the cluster group.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
index 5cdb20c..067bd2a 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
index 7659bb4..591d426 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
@@ -24,7 +24,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
     /// <summary>
     /// Local message listener which signals countdown event on each received message.
     /// </summary>
-    public class LocalListener : IMessageFilter<int>
+    public class LocalListener : IMessageListener<int>
     {
         /** Countdown event. */
         private readonly CountdownEvent _countdown;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
index 8ae5ac1..85538c2 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
     /// Listener for Ordered topic.
     /// </summary>
     [Serializable]
-    public class RemoteOrderedListener : IMessageFilter<int>
+    public class RemoteOrderedListener : IMessageListener<int>
     {
         /** Injected Ignite instance. */
         [InstanceResource]

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
index 166dbd6..ab23e8b 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
     /// Listener for Unordered topic.
     /// </summary>
     [Serializable]
-    public class RemoteUnorderedListener : IMessageFilter<int>
+    public class RemoteUnorderedListener : IMessageListener<int>
     {
         /** Injected Ignite instance. */
         [InstanceResource]