You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/15 11:20:16 UTC
ignite git commit: IGNITE-1662: Renamed IMessageFilter to
IMessageListener.
Repository: ignite
Updated Branches:
refs/heads/ignite-1282 81feb9596 -> 2a77dd3a7
IGNITE-1662: Renamed IMessageFilter to IMessageListener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a77dd3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a77dd3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a77dd3a
Branch: refs/heads/ignite-1282
Commit: 2a77dd3a7f01208d7172b66f6520cfc0e6615570
Parents: 81feb95
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Oct 15 12:20:58 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 12:20:58 2015 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.Tests/EventsTest.cs | 2 +-
.../IgniteStartStopTest.cs | 4 +-
.../Apache.Ignite.Core.Tests/MessagingTest.cs | 16 +-
.../Apache.Ignite.Core.csproj | 4 +-
.../dotnet/Apache.Ignite.Core/Events/IEvents.cs | 2 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 16 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 2 +-
.../Impl/Messaging/MessageFilterHolder.cs | 177 -------------------
.../Impl/Messaging/MessageListenerHolder.cs | 177 +++++++++++++++++++
.../Impl/Messaging/Messaging.cs | 22 +--
.../Impl/Portable/PortableMarshaller.cs | 2 +-
.../Impl/Portable/PortableUtils.cs | 2 +-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 4 +-
.../Messaging/IMessageFilter.cs | 35 ----
.../Messaging/IMessageListener.cs | 38 ++++
.../Apache.Ignite.Core/Messaging/IMessaging.cs | 15 +-
.../Events/LocalListener.cs | 2 +-
.../Messaging/LocalListener.cs | 2 +-
.../Messaging/RemoteOrderedListener.cs | 2 +-
.../Messaging/RemoteUnorderedListener.cs | 2 +-
20 files changed, 266 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
index c271aa6..b325d36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
@@ -383,7 +383,7 @@ namespace Apache.Ignite.Core.Tests
var expectedType = EventType.JobStarted;
var remoteFilter = portable
- ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
+ ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
: new RemoteEventFilter(expectedType);
var localListener = EventsTestHelper.GetListener();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index bd776ce..d16063f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -384,7 +384,7 @@ namespace Apache.Ignite.Core.Tests
// to test race conditions during processor init on remote node
var listenTask = Task.Factory.StartNew(() =>
{
- var filter = new MessageFilter();
+ var filter = new MessageListener();
while (!token.IsCancellationRequested)
{
@@ -410,7 +410,7 @@ namespace Apache.Ignite.Core.Tests
/// Noop message filter.
/// </summary>
[Serializable]
- private class MessageFilter : IMessageFilter<int>
+ private class MessageListener : IMessageListener<int>
{
/** <inheritdoc /> */
public bool Invoke(Guid nodeId, int message)
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
index 95e48d3..55f2e6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
@@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests
{
var grid3GotMessage = false;
- var grid3Listener = new MessageFilter<string>((id, x) =>
+ var grid3Listener = new MessageListener<string>((id, x) =>
{
grid3GotMessage = true;
return true;
@@ -199,7 +199,7 @@ namespace Apache.Ignite.Core.Tests
var sharedReceived = 0;
- var sharedListener = new MessageFilter<string>((id, x) =>
+ var sharedListener = new MessageListener<string>((id, x) =>
{
Interlocked.Increment(ref sharedReceived);
Thread.MemoryBarrier();
@@ -220,7 +220,7 @@ namespace Apache.Ignite.Core.Tests
var localReceived = 0;
var stopLocal = 0;
- var localListener = new MessageFilter<string>((id, x) =>
+ var localListener = new MessageListener<string>((id, x) =>
{
Interlocked.Increment(ref localReceived);
Thread.MemoryBarrier();
@@ -569,9 +569,9 @@ namespace Apache.Ignite.Core.Tests
/// Gets the message listener.
/// </summary>
/// <returns>New instance of message listener.</returns>
- public static IMessageFilter<string> GetListener()
+ public static IMessageListener<string> GetListener()
{
- return new MessageFilter<string>(Listen);
+ return new MessageListener<string>(Listen);
}
/// <summary>
@@ -616,7 +616,7 @@ namespace Apache.Ignite.Core.Tests
/// Test message filter.
/// </summary>
[Serializable]
- public class MessageFilter<T> : IMessageFilter<T>
+ public class MessageListener<T> : IMessageListener<T>
{
/** */
private readonly Func<Guid, T, bool> _invoke;
@@ -628,10 +628,10 @@ namespace Apache.Ignite.Core.Tests
#pragma warning restore 649
/// <summary>
- /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class.
+ /// Initializes a new instance of the <see cref="MessageListener{T}"/> class.
/// </summary>
/// <param name="invoke">The invoke delegate.</param>
- public MessageFilter(Func<Guid, T, bool> invoke)
+ public MessageListener(Func<Guid, T, bool> invoke)
{
_invoke = invoke;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 848ce49..a10a0a5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -238,7 +238,7 @@
<Compile Include="Impl\Memory\PlatformPooledMemory.cs" />
<Compile Include="Impl\Memory\PlatformRawMemory.cs" />
<Compile Include="Impl\Memory\PlatformUnpooledMemory.cs" />
- <Compile Include="Impl\Messaging\MessageFilterHolder.cs" />
+ <Compile Include="Impl\Messaging\MessageListenerHolder.cs" />
<Compile Include="Impl\Messaging\Messaging.cs" />
<Compile Include="Impl\Messaging\MessagingAsync.cs" />
<Compile Include="Impl\NativeMethods.cs" />
@@ -307,7 +307,7 @@
<Compile Include="Impl\Unmanaged\UnmanagedUtils.cs" />
<Compile Include="Lifecycle\ILifecycleBean.cs" />
<Compile Include="Lifecycle\LifecycleEventType.cs" />
- <Compile Include="Messaging\IMessageFilter.cs" />
+ <Compile Include="Messaging\IMessageListener.cs" />
<Compile Include="Messaging\IMessaging.cs" />
<Compile Include="Portable\IPortableBuilder.cs" />
<Compile Include="Portable\IPortableIdMapper.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
index be38104..b2f07d4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Events
/// </returns>
[AsyncSupported]
Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
where T : IEvent;
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 8b97884..0f2b3c1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -58,7 +58,7 @@ namespace Apache.Ignite.Core.Impl.Common
_cacheEntryProcessor;
/** */
- private readonly Func<object, Guid, object, bool> _messageFilter;
+ private readonly Func<object, Guid, object, bool> _messageLsnr;
/** */
private readonly Func<object, object> _computeJobExecute;
@@ -136,13 +136,13 @@ namespace Apache.Ignite.Core.Impl.Common
}
/// <summary>
- /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+ /// Gets the <see cref="IMessageListener{T}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
- public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+ public static Func<object, Guid, object, bool> GetMessageListener(Type type)
{
- return Get(type)._messageFilter;
+ return Get(type)._messageLsnr;
}
/// <summary>
@@ -286,18 +286,18 @@ namespace Apache.Ignite.Core.Impl.Common
_streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
new[] {iface});
}
- else if (genericTypeDefinition == typeof (IMessageFilter<>))
+ else if (genericTypeDefinition == typeof (IMessageListener<>))
{
- ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+ ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IMessageListener<>));
var arg = iface.GetGenericArguments()[0];
- _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ _messageLsnr = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
new[] { typeof(Guid), arg }, new[] { false, true, false });
}
else if (genericTypeDefinition == typeof (IComputeJob<>))
{
- ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+ ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IComputeJob<>));
_computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0],
methodName: "Execute");
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index f4cc341..08936e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -164,7 +164,7 @@ namespace Apache.Ignite.Core.Impl.Events
/** <inheritDoc /> */
public Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
where T : IEvent
{
return RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, TypesToArray(types));
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
deleted file mode 100644
index 8666e9b..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Messaging
-{
- using System;
- using System.Diagnostics;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Handle;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Portable.IO;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Messaging;
- using Apache.Ignite.Core.Portable;
-
- /// <summary>
- /// Non-generic portable filter wrapper.
- /// </summary>
- internal class MessageFilterHolder : IPortableWriteAware, IHandle
- {
- /** Invoker function that takes key and value and invokes wrapped IMessageFilter */
- private readonly Func<Guid, object, bool> _invoker;
-
- /** Current Ignite instance. */
- private readonly Ignite _ignite;
-
- /** Underlying filter. */
- private readonly object _filter;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="MessageFilterHolder" /> class.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="filter">The <see cref="IMessageFilter{T}" /> to wrap.</param>
- /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageFilter.</param>
- private MessageFilterHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
- {
- Debug.Assert(filter != null);
- Debug.Assert(invoker != null);
-
- _invoker = invoker;
-
- _filter = filter;
-
- // 1. Set fields.
- Debug.Assert(grid != null);
-
- _ignite = grid;
- _invoker = invoker;
-
- // 2. Perform injections.
- ResourceProcessor.Inject(filter, grid);
- }
-
- /// <summary>
- /// Invoke the filter.
- /// </summary>
- /// <param name="input">Input.</param>
- /// <returns></returns>
- public int Invoke(IPortableStream input)
- {
- var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
-
- var nodeId = rawReader.ReadGuid();
-
- Debug.Assert(nodeId != null);
-
- return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
- }
-
- /// <summary>
- /// Wrapped <see cref="IMessageFilter{T}" />.
- /// </summary>
- public object Filter
- {
- get { return _filter; }
- }
-
- /// <summary>
- /// Destroy callback.
- /// </summary>
- public Action DestroyAction { private get; set; }
-
- /** <inheritDoc /> */
- public void Release()
- {
- if (DestroyAction != null)
- DestroyAction();
- }
-
- /** <inheritDoc /> */
- public bool Released
- {
- get { return false; } // Multiple releases are allowed.
- }
-
- /// <summary>
- /// Creates local holder instance.
- /// </summary>
- /// <param name="grid">Ignite instance.</param>
- /// <param name="filter">Filter.</param>
- /// <returns>
- /// New instance of <see cref="MessageFilterHolder" />
- /// </returns>
- public static MessageFilterHolder CreateLocal<T>(Ignite grid, IMessageFilter<T> filter)
- {
- Debug.Assert(filter != null);
-
- return new MessageFilterHolder(grid, filter, (id, msg) => filter.Invoke(id, (T)msg));
- }
-
- /// <summary>
- /// Creates remote holder instance.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="memPtr">Memory pointer.</param>
- /// <returns>Deserialized instance of <see cref="MessageFilterHolder"/></returns>
- public static MessageFilterHolder CreateRemote(Ignite grid, long memPtr)
- {
- Debug.Assert(grid != null);
-
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return grid.Marshaller.Unmarshal<MessageFilterHolder>(stream);
- }
- }
-
- /// <summary>
- /// Gets the invoker func.
- /// </summary>
- private static Func<Guid, object, bool> GetInvoker(object pred)
- {
- var func = DelegateTypeDescriptor.GetMessageFilter(pred.GetType());
-
- return (id, msg) => func(pred, id, msg);
- }
-
- /** <inheritdoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl)writer.GetRawWriter();
-
- writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="MessageFilterHolder"/> class.
- /// </summary>
- /// <param name="reader">The reader.</param>
- public MessageFilterHolder(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl)reader.GetRawReader();
-
- _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
- _invoker = GetInvoker(_filter);
-
- _ignite = reader0.Marshaller.Ignite;
-
- ResourceProcessor.Inject(_filter, _ignite);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
new file mode 100644
index 0000000..412a84e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Messaging
+{
+ using System;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Handle;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Messaging;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Non-generic portable message listener wrapper.
+ /// </summary>
+ internal class MessageListenerHolder : IPortableWriteAware, IHandle
+ {
+ /** Invoker function that takes key and value and invokes wrapped IMessageListener */
+ private readonly Func<Guid, object, bool> _invoker;
+
+ /** Current Ignite instance. */
+ private readonly Ignite _ignite;
+
+ /** Underlying filter. */
+ private readonly object _filter;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="MessageListenerHolder" /> class.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="filter">The <see cref="IMessageListener{T}" /> to wrap.</param>
+ /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageListener.</param>
+ private MessageListenerHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
+ {
+ Debug.Assert(filter != null);
+ Debug.Assert(invoker != null);
+
+ _invoker = invoker;
+
+ _filter = filter;
+
+ // 1. Set fields.
+ Debug.Assert(grid != null);
+
+ _ignite = grid;
+ _invoker = invoker;
+
+ // 2. Perform injections.
+ ResourceProcessor.Inject(filter, grid);
+ }
+
+ /// <summary>
+ /// Invoke the filter.
+ /// </summary>
+ /// <param name="input">Input.</param>
+ /// <returns></returns>
+ public int Invoke(IPortableStream input)
+ {
+ var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
+
+ var nodeId = rawReader.ReadGuid();
+
+ Debug.Assert(nodeId != null);
+
+ return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
+ }
+
+ /// <summary>
+ /// Wrapped <see cref="IMessageListener{T}" />.
+ /// </summary>
+ public object Filter
+ {
+ get { return _filter; }
+ }
+
+ /// <summary>
+ /// Destroy callback.
+ /// </summary>
+ public Action DestroyAction { private get; set; }
+
+ /** <inheritDoc /> */
+ public void Release()
+ {
+ if (DestroyAction != null)
+ DestroyAction();
+ }
+
+ /** <inheritDoc /> */
+ public bool Released
+ {
+ get { return false; } // Multiple releases are allowed.
+ }
+
+ /// <summary>
+ /// Creates local holder instance.
+ /// </summary>
+ /// <param name="grid">Ignite instance.</param>
+ /// <param name="listener">Filter.</param>
+ /// <returns>
+ /// New instance of <see cref="MessageListenerHolder" />
+ /// </returns>
+ public static MessageListenerHolder CreateLocal<T>(Ignite grid, IMessageListener<T> listener)
+ {
+ Debug.Assert(listener != null);
+
+ return new MessageListenerHolder(grid, listener, (id, msg) => listener.Invoke(id, (T)msg));
+ }
+
+ /// <summary>
+ /// Creates remote holder instance.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="memPtr">Memory pointer.</param>
+ /// <returns>Deserialized instance of <see cref="MessageListenerHolder"/></returns>
+ public static MessageListenerHolder CreateRemote(Ignite grid, long memPtr)
+ {
+ Debug.Assert(grid != null);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return grid.Marshaller.Unmarshal<MessageListenerHolder>(stream);
+ }
+ }
+
+ /// <summary>
+ /// Gets the invoker func.
+ /// </summary>
+ private static Func<Guid, object, bool> GetInvoker(object pred)
+ {
+ var func = DelegateTypeDescriptor.GetMessageListener(pred.GetType());
+
+ return (id, msg) => func(pred, id, msg);
+ }
+
+ /** <inheritdoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.GetRawWriter();
+
+ writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="MessageListenerHolder"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public MessageListenerHolder(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.GetRawReader();
+
+ _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+ _invoker = GetInvoker(_filter);
+
+ _ignite = reader0.Marshaller.Ignite;
+
+ ResourceProcessor.Inject(_filter, _ignite);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 8170a91..4ccbc3e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -113,17 +113,17 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public void LocalListen<T>(IMessageFilter<T> filter, object topic = null)
+ public void LocalListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
- ResourceProcessor.Inject(filter, _ignite);
+ ResourceProcessor.Inject(listener, _ignite);
lock (_funcMap)
{
- var key = GetKey(filter, topic);
+ var key = GetKey(listener, topic);
- MessageFilterHolder filter0 = MessageFilterHolder.CreateLocal(_ignite, filter);
+ MessageListenerHolder filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
var filterHnd = _ignite.HandleRegistry.Allocate(filter0);
@@ -155,16 +155,16 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null)
+ public void StopLocalListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
long filterHnd;
bool removed;
lock (_funcMap)
{
- removed = _funcMap.TryRemove(GetKey(filter, topic), out filterHnd);
+ removed = _funcMap.TryRemove(GetKey(listener, topic), out filterHnd);
}
if (removed)
@@ -178,11 +178,11 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null)
+ public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
- var filter0 = MessageFilterHolder.CreateLocal(_ignite, filter);
+ var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
try
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
index 67d8f2b..6499946 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
@@ -516,7 +516,7 @@ namespace Apache.Ignite.Core.Impl.Portable
AddSystemType(PortableUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w));
AddSystemType(PortableUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w));
AddSystemType(PortableUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w));
- AddSystemType(PortableUtils.TypeMessageFilterHolder, w => new MessageFilterHolder(w));
+ AddSystemType(PortableUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w));
AddSystemType(PortableUtils.TypePortableOrSerializableHolder, w => new PortableOrSerializableObjectHolder(w));
AddSystemType(PortableUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
index f80a199..c7be496 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
@@ -200,7 +200,7 @@ namespace Apache.Ignite.Core.Impl.Portable
public const byte TypeProductLicense = 78;
/** Type: message filter holder. */
- public const byte TypeMessageFilterHolder = 92;
+ public const byte TypeMessageListenerHolder = 92;
/** Type: message filter holder. */
public const byte TypePortableOrSerializableHolder = 93;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index f9949f3..3295904 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -788,7 +788,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
return SafeCall(() =>
{
- MessageFilterHolder holder = MessageFilterHolder.CreateRemote(_ignite, memPtr);
+ MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr);
return _ignite.HandleRegistry.AllocateSafe(holder);
});
@@ -798,7 +798,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
return SafeCall(() =>
{
- var holder = _ignite.HandleRegistry.Get<MessageFilterHolder>(ptr, false);
+ var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
if (holder == null)
return 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
deleted file mode 100644
index 456c5e6..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Messaging
-{
- using System;
-
- /// <summary>
- /// Represents messaging filter predicate.
- /// </summary>
- public interface IMessageFilter<in T>
- {
- /// <summary>
- /// Returns a value indicating whether provided message and node id satisfy this predicate.
- /// </summary>
- /// <param name="nodeId">Node identifier.</param>
- /// <param name="message">Message.</param>
- /// <returns>Value indicating whether provided message and node id satisfy this predicate.</returns>
- bool Invoke(Guid nodeId, T message);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
new file mode 100644
index 0000000..393a670
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Messaging
+{
+ using System;
+
+ /// <summary>
+ /// Represents messaging filter predicate.
+ /// </summary>
+ public interface IMessageListener<in T>
+ {
+ /// <summary>
+ /// Invokes the message listener when a message arrives.
+ /// </summary>
+ /// <param name="nodeId">Message source node identifier.</param>
+ /// <param name="message">Message.</param>
+ /// <returns>
+ /// Value indicating whether this instance should remain subscribed.
+ /// Returning <c>false</c> will unsubscribe this message listener from further notifications.
+ /// </returns>
+ bool Invoke(Guid nodeId, T message);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
index 96f46b9..f846745 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
@@ -67,19 +67,19 @@ namespace Apache.Ignite.Core.Messaging
/// node within the cluster group will send a message for a given topic to this node. Local listen
/// subscription will happen regardless of whether local node belongs to this cluster group or not.
/// </summary>
- /// <param name="filter">
+ /// <param name="listener">
/// Predicate that is called on each received message. If predicate returns false,
/// then it will be unsubscribed from any further notifications.
/// </param>
/// <param name="topic">Topic to subscribe to.</param>
- void LocalListen<T>(IMessageFilter<T> filter, object topic = null);
+ void LocalListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Unregisters local listener for given topic on local node only.
/// </summary>
- /// <param name="filter">Listener predicate.</param>
+ /// <param name="listener">Listener predicate.</param>
/// <param name="topic">Topic to unsubscribe from.</param>
- void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null);
+ void StopLocalListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Adds a message listener for a given topic to all nodes in the cluster group (possibly including
@@ -87,13 +87,16 @@ namespace Apache.Ignite.Core.Messaging
/// group can send a message for a given topic and all nodes within the cluster group will receive
/// listener notifications.
/// </summary>
- /// <param name="filter">Listener predicate.</param>
+ /// <param name="listener">
+ /// Predicate that is called on each received message. If predicate returns false,
+ /// then it will be unsubscribed from any further notifications.
+ /// </param>
/// <param name="topic">Topic to unsubscribe from.</param>
/// <returns>
/// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening.
/// </returns>
[AsyncSupported]
- Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null);
+ Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Unregisters all listeners identified with provided operation ID on all nodes in the cluster group.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
index 5cdb20c..067bd2a 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
index 7659bb4..591d426 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
@@ -24,7 +24,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// <summary>
/// Local message listener which signals countdown event on each received message.
/// </summary>
- public class LocalListener : IMessageFilter<int>
+ public class LocalListener : IMessageListener<int>
{
/** Countdown event. */
private readonly CountdownEvent _countdown;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
index 8ae5ac1..85538c2 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// Listener for Ordered topic.
/// </summary>
[Serializable]
- public class RemoteOrderedListener : IMessageFilter<int>
+ public class RemoteOrderedListener : IMessageListener<int>
{
/** Injected Ignite instance. */
[InstanceResource]
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
index 166dbd6..ab23e8b 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// Listener for Unordered topic.
/// </summary>
[Serializable]
- public class RemoteUnorderedListener : IMessageFilter<int>
+ public class RemoteUnorderedListener : IMessageListener<int>
{
/** Injected Ignite instance. */
[InstanceResource]