You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/15 14:52:47 UTC
[1/6] ignite git commit: IGNITE-1642: Renamed events.
Repository: ignite
Updated Branches:
refs/heads/ignite-1655 c1952acbf -> f12880664
IGNITE-1642: Renamed events.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52534c33
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52534c33
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52534c33
Branch: refs/heads/ignite-1655
Commit: 52534c33ab517981abb6356f550155bc8ae23761
Parents: 7c1ef1a
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Oct 14 13:30:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Oct 14 13:30:08 2015 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.Tests/EventsTest.cs | 74 ++--
.../Apache.Ignite.Core/Events/EventType.cs | 344 +++++++++----------
.../Events/EventsExample.cs | 4 +-
3 files changed, 206 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/52534c33/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
index 2c014e2..c271aa6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
@@ -104,19 +104,19 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(0, events.GetEnabledEvents().Count);
- Assert.IsFalse(EventType.EventsCache.Any(events.IsEnabled));
+ Assert.IsFalse(EventType.CacheAll.Any(events.IsEnabled));
- events.EnableLocal(EventType.EventsCache);
+ events.EnableLocal(EventType.CacheAll);
- Assert.AreEqual(EventType.EventsCache, events.GetEnabledEvents());
+ Assert.AreEqual(EventType.CacheAll, events.GetEnabledEvents());
- Assert.IsTrue(EventType.EventsCache.All(events.IsEnabled));
+ Assert.IsTrue(EventType.CacheAll.All(events.IsEnabled));
- events.EnableLocal(EventType.EventsTaskExecution);
+ events.EnableLocal(EventType.TaskExecutionAll);
- events.DisableLocal(EventType.EventsCache);
+ events.DisableLocal(EventType.CacheAll);
- Assert.AreEqual(EventType.EventsTaskExecution, events.GetEnabledEvents());
+ Assert.AreEqual(EventType.TaskExecutionAll, events.GetEnabledEvents());
}
/// <summary>
@@ -127,7 +127,7 @@ namespace Apache.Ignite.Core.Tests
{
var events = _grid1.GetEvents();
var listener = EventsTestHelper.GetListener();
- var eventType = EventType.EventsTaskExecution;
+ var eventType = EventType.TaskExecutionAll;
events.EnableLocal(eventType);
@@ -136,7 +136,7 @@ namespace Apache.Ignite.Core.Tests
CheckSend(3); // 3 events per task * 3 grids
// Check unsubscription for specific event
- events.StopLocalListen(listener, EventType.EventTaskReduced);
+ events.StopLocalListen(listener, EventType.TaskReduced);
CheckSend(2);
@@ -146,7 +146,7 @@ namespace Apache.Ignite.Core.Tests
CheckNoEvent();
// Check unsubscription by filter
- events.LocalListen(listener, EventType.EventTaskReduced);
+ events.LocalListen(listener, EventType.TaskReduced);
CheckSend();
@@ -166,7 +166,7 @@ namespace Apache.Ignite.Core.Tests
{
var events = _grid1.GetEvents();
var listener = EventsTestHelper.GetListener();
- var eventType = EventType.EventsTaskExecution;
+ var eventType = EventType.TaskExecutionAll;
events.EnableLocal(eventType);
@@ -234,7 +234,7 @@ namespace Apache.Ignite.Core.Tests
{
yield return new EventTestCase
{
- EventType = EventType.EventsCache,
+ EventType = EventType.CacheAll,
EventObjectType = typeof (CacheEvent),
GenerateEvent = g => g.GetCache<int, int>(null).Put(1, 1),
VerifyEvents = (e, g) => VerifyCacheEvents(e, g),
@@ -243,7 +243,7 @@ namespace Apache.Ignite.Core.Tests
yield return new EventTestCase
{
- EventType = EventType.EventsTaskExecution,
+ EventType = EventType.TaskExecutionAll,
EventObjectType = typeof (TaskEvent),
GenerateEvent = g => GenerateTaskEvent(g),
VerifyEvents = (e, g) => VerifyTaskEvents(e),
@@ -252,7 +252,7 @@ namespace Apache.Ignite.Core.Tests
yield return new EventTestCase
{
- EventType = EventType.EventsJobExecution,
+ EventType = EventType.JobExecutionAll,
EventObjectType = typeof (JobEvent),
GenerateEvent = g => GenerateTaskEvent(g),
EventCount = 9
@@ -260,7 +260,7 @@ namespace Apache.Ignite.Core.Tests
yield return new EventTestCase
{
- EventType = new[] {EventType.EventCacheQueryExecuted},
+ EventType = new[] {EventType.CacheQueryExecuted},
EventObjectType = typeof (CacheQueryExecutedEvent),
GenerateEvent = g => GenerateCacheQueryEvent(g),
EventCount = 1
@@ -268,7 +268,7 @@ namespace Apache.Ignite.Core.Tests
yield return new EventTestCase
{
- EventType = new[] { EventType.EventCacheQueryObjectRead },
+ EventType = new[] { EventType.CacheQueryObjectRead },
EventObjectType = typeof (CacheQueryReadEvent),
GenerateEvent = g => GenerateCacheQueryEvent(g),
EventCount = 1
@@ -284,7 +284,7 @@ namespace Apache.Ignite.Core.Tests
{
var events = _grid1.GetEvents();
- var eventType = EventType.EventsTaskExecution;
+ var eventType = EventType.TaskExecutionAll;
events.EnableLocal(eventType);
@@ -311,7 +311,7 @@ namespace Apache.Ignite.Core.Tests
if (async)
events = events.WithAsync();
- var eventType = EventType.EventsTaskExecution;
+ var eventType = EventType.TaskExecutionAll;
events.EnableLocal(eventType);
@@ -340,27 +340,27 @@ namespace Apache.Ignite.Core.Tests
waitTask.Wait(timeout);
// Event types
- waitTask = getWaitTask(() => events.WaitForLocal(EventType.EventTaskReduced));
+ waitTask = getWaitTask(() => events.WaitForLocal(EventType.TaskReduced));
Assert.IsTrue(waitTask.Wait(timeout));
Assert.IsInstanceOf(typeof(TaskEvent), waitTask.Result);
- Assert.AreEqual(EventType.EventTaskReduced, waitTask.Result.Type);
+ Assert.AreEqual(EventType.TaskReduced, waitTask.Result.Type);
// Filter
waitTask = getWaitTask(() => events.WaitForLocal(
- new EventFilter<IEvent>((g, e) => e.Type == EventType.EventTaskReduced)));
+ new EventFilter<IEvent>((g, e) => e.Type == EventType.TaskReduced)));
Assert.IsTrue(waitTask.Wait(timeout));
Assert.IsInstanceOf(typeof(TaskEvent), waitTask.Result);
- Assert.AreEqual(EventType.EventTaskReduced, waitTask.Result.Type);
+ Assert.AreEqual(EventType.TaskReduced, waitTask.Result.Type);
// Filter & types
waitTask = getWaitTask(() => events.WaitForLocal(
- new EventFilter<IEvent>((g, e) => e.Type == EventType.EventTaskReduced), EventType.EventTaskReduced));
+ new EventFilter<IEvent>((g, e) => e.Type == EventType.TaskReduced), EventType.TaskReduced));
Assert.IsTrue(waitTask.Wait(timeout));
Assert.IsInstanceOf(typeof(TaskEvent), waitTask.Result);
- Assert.AreEqual(EventType.EventTaskReduced, waitTask.Result.Type);
+ Assert.AreEqual(EventType.TaskReduced, waitTask.Result.Type);
}
/// <summary>
@@ -374,13 +374,13 @@ namespace Apache.Ignite.Core.Tests
{
foreach (var g in _grids)
{
- g.GetEvents().EnableLocal(EventType.EventsJobExecution);
- g.GetEvents().EnableLocal(EventType.EventsTaskExecution);
+ g.GetEvents().EnableLocal(EventType.JobExecutionAll);
+ g.GetEvents().EnableLocal(EventType.TaskExecutionAll);
}
var events = _grid1.GetEvents();
- var expectedType = EventType.EventJobStarted;
+ var expectedType = EventType.JobStarted;
var remoteFilter = portable
? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
@@ -401,7 +401,7 @@ namespace Apache.Ignite.Core.Tests
CheckSend(3, typeof(JobEvent), expectedType);
- _grid3.GetEvents().DisableLocal(EventType.EventsJobExecution);
+ _grid3.GetEvents().DisableLocal(EventType.JobExecutionAll);
CheckSend(2, typeof(JobEvent), expectedType);
@@ -435,11 +435,11 @@ namespace Apache.Ignite.Core.Tests
public void TestRemoteQuery([Values(true, false)] bool async)
{
foreach (var g in _grids)
- g.GetEvents().EnableLocal(EventType.EventsJobExecution);
+ g.GetEvents().EnableLocal(EventType.JobExecutionAll);
var events = _grid1.GetEvents();
- var eventFilter = new RemoteEventFilter(EventType.EventJobStarted);
+ var eventFilter = new RemoteEventFilter(EventType.JobStarted);
var oldEvents = events.RemoteQuery(eventFilter);
@@ -448,7 +448,7 @@ namespace Apache.Ignite.Core.Tests
GenerateTaskEvent();
- var remoteQuery = events.RemoteQuery(eventFilter, EventsTestHelper.Timeout, EventType.EventsJobExecution);
+ var remoteQuery = events.RemoteQuery(eventFilter, EventsTestHelper.Timeout, EventType.JobExecutionAll);
if (async)
{
@@ -461,7 +461,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(_grids.Length, qryResult.Count);
- Assert.IsTrue(qryResult.All(x => x.Type == EventType.EventJobStarted));
+ Assert.IsTrue(qryResult.All(x => x.Type == EventType.JobStarted));
}
/// <summary>
@@ -577,7 +577,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(locNode, evt.Node);
Assert.AreEqual("msg", evt.Message);
- Assert.AreEqual(EventType.EventSwapSpaceCleared, evt.Type);
+ Assert.AreEqual(EventType.SwapSpaceCleared, evt.Type);
Assert.IsNotNullOrEmpty(evt.Name);
Assert.AreNotEqual(Guid.Empty, evt.Id.GlobalId);
Assert.IsTrue((evt.Timestamp - DateTime.Now).TotalSeconds < 10);
@@ -596,7 +596,7 @@ namespace Apache.Ignite.Core.Tests
GenerateTaskEvent();
EventsTestHelper.VerifyReceive(repeat, eventObjectType ?? typeof (TaskEvent),
- eventType.Any() ? eventType : EventType.EventsTaskExecution);
+ eventType.Any() ? eventType : EventType.TaskExecutionAll);
}
/// <summary>
@@ -651,7 +651,7 @@ namespace Apache.Ignite.Core.Tests
// started, reduced, finished
Assert.AreEqual(
- new[] {EventType.EventTaskStarted, EventType.EventTaskReduced, EventType.EventTaskFinished},
+ new[] {EventType.TaskStarted, EventType.TaskReduced, EventType.TaskFinished},
e.Select(x => x.Type).ToArray());
}
@@ -687,12 +687,12 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(false, cacheEvent.HasOldValue);
Assert.AreEqual(null, cacheEvent.OldValue);
- if (cacheEvent.Type == EventType.EventCacheObjectPut)
+ if (cacheEvent.Type == EventType.CacheObjectPut)
{
Assert.AreEqual(true, cacheEvent.HasNewValue);
Assert.AreEqual(1, cacheEvent.NewValue);
}
- else if (cacheEvent.Type == EventType.EventCacheEntryCreated)
+ else if (cacheEvent.Type == EventType.CacheEntryCreated)
{
Assert.AreEqual(false, cacheEvent.HasNewValue);
Assert.AreEqual(null, cacheEvent.NewValue);
http://git-wip-us.apache.org/repos/asf/ignite/blob/52534c33/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventType.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventType.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventType.cs
index 1c56de2..56a1b83 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventType.cs
@@ -15,10 +15,12 @@
* limitations under the License.
*/
+// ReSharper disable ConvertToConstant.Global
+// ReSharper disable MemberCanBePrivate.Global
+// ReSharper disable UnusedMember.Global
namespace Apache.Ignite.Core.Events
{
using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reflection;
@@ -39,556 +41,544 @@ namespace Apache.Ignite.Core.Events
/// <summary>
/// Built-in event type: checkpoint was saved.
/// </summary>
- public static readonly int EventCheckpointSaved = 1;
+ public static readonly int CheckpointSaved = 1;
/// <summary>
/// Built-in event type: checkpoint was loaded.
/// </summary>
- public static readonly int EventCheckpointLoaded = 2;
+ public static readonly int CheckpointLoaded = 2;
/// <summary>
/// Built-in event type: checkpoint was removed. Reasons are: timeout expired, or or it was manually removed,
/// or it was automatically removed by the task session.
/// </summary>
- public static readonly int EventCheckpointRemoved = 3;
+ public static readonly int CheckpointRemoved = 3;
/// <summary>
/// Built-in event type: node joined topology. New node has been discovered and joined grid topology. Note that
/// even though a node has been discovered there could be a number of warnings in the log. In certain
/// situations Ignite doesn't prevent a node from joining but prints warning messages into the log.
/// </summary>
- public static readonly int EventNodeJoined = 10;
+ public static readonly int NodeJoined = 10;
/// <summary>
/// Built-in event type: node has normally left topology.
/// </summary>
- public static readonly int EventNodeLeft = 11;
+ public static readonly int NodeLeft = 11;
/// <summary>
/// Built-in event type: node failed. Ignite detected that node has presumably crashed and is considered
/// failed.
/// </summary>
- public static readonly int EventNodeFailed = 12;
+ public static readonly int NodeFailed = 12;
/// <summary>
/// Built-in event type: node metrics updated. Generated when node's metrics are updated. In most cases this
/// callback is invoked with every heartbeat received from a node (including local node).
/// </summary>
- public static readonly int EventNodeMetricsUpdated = 13;
+ public static readonly int NodeMetricsUpdated = 13;
/// <summary>
/// Built-in event type: local node segmented. Generated when node determines that it runs in invalid network
/// segment.
/// </summary>
- public static readonly int EventNodeSegmented = 14;
+ public static readonly int NodeSegmented = 14;
/// <summary>
/// Built-in event type: client node disconnected.
/// </summary>
- public static readonly int EventClientNodeDisconnected = 16;
+ public static readonly int ClientNodeDisconnected = 16;
/// <summary>
/// Built-in event type: client node reconnected.
/// </summary>
- public static readonly int EventClientNodeReconnected = 17;
+ public static readonly int ClientNodeReconnected = 17;
/// <summary>
/// Built-in event type: task started.
/// </summary>
- public static readonly int EventTaskStarted = 20;
+ public static readonly int TaskStarted = 20;
/// <summary>
/// Built-in event type: task finished. Task got finished. This event is triggered every time a task finished
/// without exception.
/// </summary>
- public static readonly int EventTaskFinished = 21;
+ public static readonly int TaskFinished = 21;
/// <summary>
/// Built-in event type: task failed. Task failed. This event is triggered every time a task finished with an
/// exception. Note that prior to this event, there could be other events recorded specific to the failure.
/// </summary>
- public static readonly int EventTaskFailed = 22;
+ public static readonly int TaskFailed = 22;
/// <summary>
/// Built-in event type: task timed out.
/// </summary>
- public static readonly int EventTaskTimedout = 23;
+ public static readonly int TaskTimedout = 23;
/// <summary>
/// Built-in event type: task session attribute set.
/// </summary>
- public static readonly int EventTaskSessionAttrSet = 24;
+ public static readonly int TaskSessionAttrSet = 24;
/// <summary>
/// Built-in event type: task reduced.
/// </summary>
- public static readonly int EventTaskReduced = 25;
+ public static readonly int TaskReduced = 25;
/// <summary>
/// Built-in event type: Ignite job was mapped in {@link org.apache.ignite.compute.ComputeTask#map(List, Object)}
/// method.
/// </summary>
- public static readonly int EventJobMapped = 40;
+ public static readonly int JobMapped = 40;
/// <summary>
/// Built-in event type: Ignite job result was received by {@link
/// org.apache.ignite.compute.ComputeTask#result(org.apache.ignite.compute.ComputeJobResult, List)} method.
/// </summary>
- public static readonly int EventJobResulted = 41;
+ public static readonly int JobResulted = 41;
/// <summary>
/// Built-in event type: Ignite job failed over.
/// </summary>
- public static readonly int EventJobFailedOver = 43;
+ public static readonly int JobFailedOver = 43;
/// <summary>
/// Built-in event type: Ignite job started.
/// </summary>
- public static readonly int EventJobStarted = 44;
+ public static readonly int JobStarted = 44;
/// <summary>
/// Built-in event type: Ignite job finished. Job has successfully completed and produced a result which from the
/// user perspective can still be either negative or positive.
/// </summary>
- public static readonly int EventJobFinished = 45;
+ public static readonly int JobFinished = 45;
/// <summary>
/// Built-in event type: Ignite job timed out.
/// </summary>
- public static readonly int EventJobTimedout = 46;
+ public static readonly int JobTimedout = 46;
/// <summary>
/// Built-in event type: Ignite job rejected during collision resolution.
/// </summary>
- public static readonly int EventJobRejected = 47;
+ public static readonly int JobRejected = 47;
/// <summary>
/// Built-in event type: Ignite job failed. Job has failed. This means that there was some error event during job
/// execution and job did not produce a result.
/// </summary>
- public static readonly int EventJobFailed = 48;
+ public static readonly int JobFailed = 48;
/// <summary>
/// Built-in event type: Ignite job queued. Job arrived for execution and has been queued (added to passive queue
/// during collision resolution).
/// </summary>
- public static readonly int EventJobQueued = 49;
+ public static readonly int JobQueued = 49;
/// <summary>
/// Built-in event type: Ignite job cancelled.
/// </summary>
- public static readonly int EventJobCancelled = 50;
+ public static readonly int JobCancelled = 50;
/// <summary>
/// Built-in event type: entry created.
/// </summary>
- public static readonly int EventCacheEntryCreated = 60;
+ public static readonly int CacheEntryCreated = 60;
/// <summary>
/// Built-in event type: entry destroyed.
/// </summary>
- public static readonly int EventCacheEntryDestroyed = 61;
+ public static readonly int CacheEntryDestroyed = 61;
/// <summary>
/// Built-in event type: entry evicted.
/// </summary>
- public static readonly int EventCacheEntryEvicted = 62;
+ public static readonly int CacheEntryEvicted = 62;
/// <summary>
/// Built-in event type: object put.
/// </summary>
- public static readonly int EventCacheObjectPut = 63;
+ public static readonly int CacheObjectPut = 63;
/// <summary>
/// Built-in event type: object read.
/// </summary>
- public static readonly int EventCacheObjectRead = 64;
+ public static readonly int CacheObjectRead = 64;
/// <summary>
/// Built-in event type: object removed.
/// </summary>
- public static readonly int EventCacheObjectRemoved = 65;
+ public static readonly int CacheObjectRemoved = 65;
/// <summary>
/// Built-in event type: object locked.
/// </summary>
- public static readonly int EventCacheObjectLocked = 66;
+ public static readonly int CacheObjectLocked = 66;
/// <summary>
/// Built-in event type: object unlocked.
/// </summary>
- public static readonly int EventCacheObjectUnlocked = 67;
+ public static readonly int CacheObjectUnlocked = 67;
/// <summary>
/// Built-in event type: cache object swapped from swap storage.
/// </summary>
- public static readonly int EventCacheObjectSwapped = 68;
+ public static readonly int CacheObjectSwapped = 68;
/// <summary>
/// Built-in event type: cache object unswapped from swap storage.
/// </summary>
- public static readonly int EventCacheObjectUnswapped = 69;
+ public static readonly int CacheObjectUnswapped = 69;
/// <summary>
/// Built-in event type: cache object was expired when reading it.
/// </summary>
- public static readonly int EventCacheObjectExpired = 70;
+ public static readonly int CacheObjectExpired = 70;
/// <summary>
/// Built-in event type: swap space data read.
/// </summary>
- public static readonly int EventSwapSpaceDataRead = 71;
+ public static readonly int SwapSpaceDataRead = 71;
/// <summary>
/// Built-in event type: swap space data stored.
/// </summary>
- public static readonly int EventSwapSpaceDataStored = 72;
+ public static readonly int SwapSpaceDataStored = 72;
/// <summary>
/// Built-in event type: swap space data removed.
/// </summary>
- public static readonly int EventSwapSpaceDataRemoved = 73;
+ public static readonly int SwapSpaceDataRemoved = 73;
/// <summary>
/// Built-in event type: swap space cleared.
/// </summary>
- public static readonly int EventSwapSpaceCleared = 74;
+ public static readonly int SwapSpaceCleared = 74;
/// <summary>
/// Built-in event type: swap space data evicted.
/// </summary>
- public static readonly int EventSwapSpaceDataEvicted = 75;
+ public static readonly int SwapSpaceDataEvicted = 75;
/// <summary>
/// Built-in event type: cache object stored in off-heap storage.
/// </summary>
- public static readonly int EventCacheObjectToOffheap = 76;
+ public static readonly int CacheObjectToOffheap = 76;
/// <summary>
/// Built-in event type: cache object moved from off-heap storage back into memory.
/// </summary>
- public static readonly int EventCacheObjectFromOffheap = 77;
+ public static readonly int CacheObjectFromOffheap = 77;
/// <summary>
/// Built-in event type: cache rebalance started.
/// </summary>
- public static readonly int EventCacheRebalanceStarted = 80;
+ public static readonly int CacheRebalanceStarted = 80;
/// <summary>
/// Built-in event type: cache rebalance stopped.
/// </summary>
- public static readonly int EventCacheRebalanceStopped = 81;
+ public static readonly int CacheRebalanceStopped = 81;
/// <summary>
/// Built-in event type: cache partition loaded.
/// </summary>
- public static readonly int EventCacheRebalancePartLoaded = 82;
+ public static readonly int CacheRebalancePartLoaded = 82;
/// <summary>
/// Built-in event type: cache partition unloaded.
/// </summary>
- public static readonly int EventCacheRebalancePartUnloaded = 83;
+ public static readonly int CacheRebalancePartUnloaded = 83;
/// <summary>
/// Built-in event type: cache entry rebalanced.
/// </summary>
- public static readonly int EventCacheRebalanceObjectLoaded = 84;
+ public static readonly int CacheRebalanceObjectLoaded = 84;
/// <summary>
/// Built-in event type: cache entry unloaded.
/// </summary>
- public static readonly int EventCacheRebalanceObjectUnloaded = 85;
+ public static readonly int CacheRebalanceObjectUnloaded = 85;
/// <summary>
/// Built-in event type: all nodes that hold partition left topology.
/// </summary>
- public static readonly int EventCacheRebalancePartDataLost = 86;
+ public static readonly int CacheRebalancePartDataLost = 86;
/// <summary>
/// Built-in event type: query executed.
/// </summary>
- public static readonly int EventCacheQueryExecuted = 96;
+ public static readonly int CacheQueryExecuted = 96;
/// <summary>
/// Built-in event type: query entry read.
/// </summary>
- public static readonly int EventCacheQueryObjectRead = 97;
+ public static readonly int CacheQueryObjectRead = 97;
/// <summary>
/// Built-in event type: cache started.
/// </summary>
- public static readonly int EventCacheStarted = 98;
+ public static readonly int CacheStarted = 98;
/// <summary>
/// Built-in event type: cache started.
/// </summary>
- public static readonly int EventCacheStopped = 99;
+ public static readonly int CacheStopped = 99;
/// <summary>
/// Built-in event type: cache nodes left.
/// </summary>
- public static readonly int EventCacheNodesLeft = 100;
+ public static readonly int CacheNodesLeft = 100;
/// <summary>
/// All events indicating an error or failure condition. It is convenient to use when fetching all events
/// indicating error or failure.
/// </summary>
- private static readonly ICollection<int> EventsError0 = new[]
+ private static readonly ICollection<int> ErrorAll0 = new[]
{
- EventJobTimedout,
- EventJobFailed,
- EventJobFailedOver,
- EventJobRejected,
- EventJobCancelled,
- EventTaskTimedout,
- EventTaskFailed,
- EventCacheRebalanceStarted,
- EventCacheRebalanceStopped
+ JobTimedout,
+ JobFailed,
+ JobFailedOver,
+ JobRejected,
+ JobCancelled,
+ TaskTimedout,
+ TaskFailed,
+ CacheRebalanceStarted,
+ CacheRebalanceStopped
}.AsReadOnly();
/// <summary>
- /// All discovery events except for <see cref="EventNodeMetricsUpdated" />. Subscription to <see
- /// cref="EventNodeMetricsUpdated" /> can generate massive amount of event processing in most cases is not
+ /// All discovery events except for <see cref="NodeMetricsUpdated" />. Subscription to <see
+ /// cref="NodeMetricsUpdated" /> can generate massive amount of event processing in most cases is not
/// necessary. If this event is indeed required you can subscribe to it individually or use <see
- /// cref="EventsDiscoveryAll0" /> array.
+ /// cref="DiscoveryAll0" /> array.
/// </summary>
- private static readonly ICollection<int> EventsDiscovery0 = new[]
+ private static readonly ICollection<int> DiscoveryAllMinusMetrics0 = new[]
{
- EventNodeJoined,
- EventNodeLeft,
- EventNodeFailed,
- EventNodeSegmented,
- EventClientNodeDisconnected,
- EventClientNodeReconnected
+ NodeJoined,
+ NodeLeft,
+ NodeFailed,
+ NodeSegmented,
+ ClientNodeDisconnected,
+ ClientNodeReconnected
}.AsReadOnly();
/// <summary>
/// All discovery events.
/// </summary>
- private static readonly ICollection<int> EventsDiscoveryAll0 = new[]
+ private static readonly ICollection<int> DiscoveryAll0 = new[]
{
- EventNodeJoined,
- EventNodeLeft,
- EventNodeFailed,
- EventNodeSegmented,
- EventNodeMetricsUpdated,
- EventClientNodeDisconnected,
- EventClientNodeReconnected
+ NodeJoined,
+ NodeLeft,
+ NodeFailed,
+ NodeSegmented,
+ NodeMetricsUpdated,
+ ClientNodeDisconnected,
+ ClientNodeReconnected
}.AsReadOnly();
/// <summary>
/// All Ignite job execution events.
/// </summary>
- private static readonly ICollection<int> EventsJobExecution0 = new[]
+ private static readonly ICollection<int> JobExecutionAll0 = new[]
{
- EventJobMapped,
- EventJobResulted,
- EventJobFailedOver,
- EventJobStarted,
- EventJobFinished,
- EventJobTimedout,
- EventJobRejected,
- EventJobFailed,
- EventJobQueued,
- EventJobCancelled
+ JobMapped,
+ JobResulted,
+ JobFailedOver,
+ JobStarted,
+ JobFinished,
+ JobTimedout,
+ JobRejected,
+ JobFailed,
+ JobQueued,
+ JobCancelled
}.AsReadOnly();
/// <summary>
/// All Ignite task execution events.
/// </summary>
- private static readonly ICollection<int> EventsTaskExecution0 = new[]
+ private static readonly ICollection<int> TaskExecutionAll0 = new[]
{
- EventTaskStarted,
- EventTaskFinished,
- EventTaskFailed,
- EventTaskTimedout,
- EventTaskSessionAttrSet,
- EventTaskReduced
+ TaskStarted,
+ TaskFinished,
+ TaskFailed,
+ TaskTimedout,
+ TaskSessionAttrSet,
+ TaskReduced
}.AsReadOnly();
/// <summary>
/// All cache events.
/// </summary>
- private static readonly ICollection<int> EventsCache0 = new[]
+ private static readonly ICollection<int> CacheAll0 = new[]
{
- EventCacheEntryCreated,
- EventCacheEntryDestroyed,
- EventCacheObjectPut,
- EventCacheObjectRead,
- EventCacheObjectRemoved,
- EventCacheObjectLocked,
- EventCacheObjectUnlocked,
- EventCacheObjectSwapped,
- EventCacheObjectUnswapped,
- EventCacheObjectExpired
+ CacheEntryCreated,
+ CacheEntryDestroyed,
+ CacheObjectPut,
+ CacheObjectRead,
+ CacheObjectRemoved,
+ CacheObjectLocked,
+ CacheObjectUnlocked,
+ CacheObjectSwapped,
+ CacheObjectUnswapped,
+ CacheObjectExpired
}.AsReadOnly();
/// <summary>
/// All cache rebalance events.
/// </summary>
- private static readonly ICollection<int> EventsCacheRebalance0 = new[]
+ private static readonly ICollection<int> CacheRebalanceAll0 = new[]
{
- EventCacheRebalanceStarted,
- EventCacheRebalanceStopped,
- EventCacheRebalancePartLoaded,
- EventCacheRebalancePartUnloaded,
- EventCacheRebalanceObjectLoaded,
- EventCacheRebalanceObjectUnloaded,
- EventCacheRebalancePartDataLost
+ CacheRebalanceStarted,
+ CacheRebalanceStopped,
+ CacheRebalancePartLoaded,
+ CacheRebalancePartUnloaded,
+ CacheRebalanceObjectLoaded,
+ CacheRebalanceObjectUnloaded,
+ CacheRebalancePartDataLost
}.AsReadOnly();
/// <summary>
/// All cache lifecycle events.
/// </summary>
- private static readonly ICollection<int> EventsCacheLifecycle0 = new[]
+ private static readonly ICollection<int> CacheLifecycleAll0 = new[]
{
- EventCacheStarted,
- EventCacheStopped,
- EventCacheNodesLeft
+ CacheStarted,
+ CacheStopped,
+ CacheNodesLeft
}.AsReadOnly();
/// <summary>
/// All cache query events.
/// </summary>
- private static readonly ICollection<int> EventsCacheQuery0 = new[]
+ private static readonly ICollection<int> CacheQueryAll0 = new[]
{
- EventCacheQueryExecuted,
- EventCacheQueryObjectRead
+ CacheQueryExecuted,
+ CacheQueryObjectRead
}.AsReadOnly();
/// <summary>
/// All swap space events.
/// </summary>
- private static readonly ICollection<int> EventsSwapspace0 = new[]
+ private static readonly ICollection<int> SwapspaceAll0 = new[]
{
- EventSwapSpaceCleared,
- EventSwapSpaceDataRemoved,
- EventSwapSpaceDataRead,
- EventSwapSpaceDataStored,
- EventSwapSpaceDataEvicted
+ SwapSpaceCleared,
+ SwapSpaceDataRemoved,
+ SwapSpaceDataRead,
+ SwapSpaceDataStored,
+ SwapSpaceDataEvicted
}.AsReadOnly();
/// <summary>
/// All Ignite events.
/// </summary>
- private static readonly ICollection<int> EventsAll0 = GetAllEvents().AsReadOnly();
+ private static readonly ICollection<int> All0 = GetAllEvents().AsReadOnly();
/// <summary>
/// All Ignite events (<b>excluding</b> metric update event).
/// </summary>
- private static readonly ICollection<int> EventsAllMinusMetricUpdate0 =
- EventsAll0.Where(x => x != EventNodeMetricsUpdated).ToArray().AsReadOnly();
+ private static readonly ICollection<int> AllMinusMetricUpdate0 =
+ All0.Where(x => x != NodeMetricsUpdated).ToArray().AsReadOnly();
/// <summary>
/// All events indicating an error or failure condition. It is convenient to use when fetching all events
/// indicating error or failure.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsError
+ public static ICollection<int> ErrorAll
{
- get { return EventsError0; }
+ get { return ErrorAll0; }
}
/// <summary>
/// All Ignite events (<b>excluding</b> metric update event).
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsAllMinusMetricUpdate
+ public static ICollection<int> AllMinusMetricUpdate
{
- get { return EventsAllMinusMetricUpdate0; }
+ get { return AllMinusMetricUpdate0; }
}
/// <summary>
/// All swap space events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsSwapspace
+ public static ICollection<int> SwapspaceAll
{
- get { return EventsSwapspace0; }
+ get { return SwapspaceAll0; }
}
/// <summary>
/// All cache query events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsCacheQuery
+ public static ICollection<int> CacheQueryAll
{
- get { return EventsCacheQuery0; }
+ get { return CacheQueryAll0; }
}
/// <summary>
/// All cache lifecycle events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsCacheLifecycle
+ public static ICollection<int> CacheLifecycleAll
{
- get { return EventsCacheLifecycle0; }
+ get { return CacheLifecycleAll0; }
}
/// <summary>
/// All cache rebalance events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsCacheRebalance
+ public static ICollection<int> CacheRebalanceAll
{
- get { return EventsCacheRebalance0; }
+ get { return CacheRebalanceAll0; }
}
/// <summary>
/// All cache events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsCache
+ public static ICollection<int> CacheAll
{
- get { return EventsCache0; }
+ get { return CacheAll0; }
}
/// <summary>
/// All Ignite task execution events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsTaskExecution
+ public static ICollection<int> TaskExecutionAll
{
- get { return EventsTaskExecution0; }
+ get { return TaskExecutionAll0; }
}
/// <summary>
/// All Ignite job execution events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsJobExecution
+ public static ICollection<int> JobExecutionAll
{
- get { return EventsJobExecution0; }
+ get { return JobExecutionAll0; }
}
/// <summary>
/// All discovery events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsDiscoveryAll
+ public static ICollection<int> DiscoveryAll
{
- get { return EventsDiscoveryAll0; }
+ get { return DiscoveryAll0; }
}
/// <summary>
- /// All discovery events except for <see cref="EventNodeMetricsUpdated" />. Subscription to <see
- /// cref="EventNodeMetricsUpdated" /> can generate massive amount of event processing in most cases is not
+ /// All discovery events except for <see cref="NodeMetricsUpdated" />. Subscription to <see
+ /// cref="NodeMetricsUpdated" /> can generate massive amount of event processing in most cases is not
/// necessary. If this event is indeed required you can subscribe to it individually or use <see
- /// cref="EventsDiscoveryAll0" /> array.
+ /// cref="DiscoveryAll0" /> array.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsDiscovery
+ public static ICollection<int> DiscoveryAllMinusMetrics
{
- get { return EventsDiscovery0; }
+ get { return DiscoveryAllMinusMetrics0; }
}
/// <summary>
/// All Ignite events.
/// </summary>
- [SuppressMessage("Microsoft.Performance", "CA1819:PropertiesShouldNotReturnArrays")]
- public static ICollection<int> EventsAll
+ public static ICollection<int> All
{
- get { return EventsAll0; }
+ get { return All0; }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/52534c33/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
index ed87309..c1f3035 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
@@ -63,7 +63,7 @@ namespace Apache.Ignite.Examples.Events
Console.WriteLine(">>> Listening for a local event...");
var listener = new LocalListener();
- ignite.GetEvents().LocalListen(listener, EventType.EventsTaskExecution);
+ ignite.GetEvents().LocalListen(listener, EventType.TaskExecutionAll);
ExecuteTask(ignite);
@@ -79,7 +79,7 @@ namespace Apache.Ignite.Examples.Events
var remoteFilter = new RemoteFilter();
var listenId = ignite.GetEvents().RemoteListen(localListener: localListener,
- remoteFilter: remoteFilter, types: EventType.EventsJobExecution);
+ remoteFilter: remoteFilter, types: EventType.JobExecutionAll);
if (listenId == null)
throw new InvalidOperationException("Subscription failed.");
[5/6] ignite git commit: IGNITE-1682 .Net: Remove RemoteListen from
Events API.
Posted by vo...@apache.org.
IGNITE-1682 .Net: Remove RemoteListen from Events API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91eeab7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91eeab7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91eeab7a
Branch: refs/heads/ignite-1655
Commit: 91eeab7ac3e29cdba4cdb9679a0c083aea8fc959
Parents: 6695e6c
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Oct 15 15:04:21 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 15:04:21 2015 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.Tests/EventsTest.cs | 38 ++++----
.../Apache.Ignite.Core.csproj | 1 +
.../Apache.Ignite.Core/Events/IEventFilter.cs | 9 +-
.../Apache.Ignite.Core/Events/IEventListener.cs | 34 +++++++
.../dotnet/Apache.Ignite.Core/Events/IEvents.cs | 97 ++------------------
.../Impl/Common/DelegateTypeDescriptor.cs | 8 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 37 +++++---
.../Impl/Events/RemoteListenEventFilter.cs | 10 +-
.../Events/EventsExample.cs | 18 ----
.../Apache.Ignite.ExamplesDll.csproj | 1 -
.../Events/LocalListener.cs | 5 +-
.../Events/RemoteFilter.cs | 42 ---------
12 files changed, 102 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
index b325d36..33841ad 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
@@ -348,7 +348,7 @@ namespace Apache.Ignite.Core.Tests
// Filter
waitTask = getWaitTask(() => events.WaitForLocal(
- new EventFilter<IEvent>((g, e) => e.Type == EventType.TaskReduced)));
+ new EventFilter<IEvent>(e => e.Type == EventType.TaskReduced)));
Assert.IsTrue(waitTask.Wait(timeout));
Assert.IsInstanceOf(typeof(TaskEvent), waitTask.Result);
@@ -356,13 +356,14 @@ namespace Apache.Ignite.Core.Tests
// Filter & types
waitTask = getWaitTask(() => events.WaitForLocal(
- new EventFilter<IEvent>((g, e) => e.Type == EventType.TaskReduced), EventType.TaskReduced));
+ new EventFilter<IEvent>(e => e.Type == EventType.TaskReduced), EventType.TaskReduced));
Assert.IsTrue(waitTask.Wait(timeout));
Assert.IsInstanceOf(typeof(TaskEvent), waitTask.Result);
Assert.AreEqual(EventType.TaskReduced, waitTask.Result.Type);
}
+ /*
/// <summary>
/// Tests RemoteListen.
/// </summary>
@@ -426,7 +427,7 @@ namespace Apache.Ignite.Core.Tests
CheckSend(1, typeof(JobEvent), expectedType); // one last event
CheckNoEvent();
- }
+ }*/
/// <summary>
/// Tests RemoteQuery.
@@ -788,7 +789,7 @@ namespace Apache.Ignite.Core.Tests
/// Gets the event listener.
/// </summary>
/// <returns>New instance of event listener.</returns>
- public static IEventFilter<IEvent> GetListener()
+ public static IEventListener<IEvent> GetListener()
{
return new EventFilter<IEvent>(Listen);
}
@@ -813,13 +814,12 @@ namespace Apache.Ignite.Core.Tests
/// <summary>
/// Listen method.
/// </summary>
- /// <param name="id">Originating node ID.</param>
/// <param name="evt">Event.</param>
- private static bool Listen(Guid? id, IEvent evt)
+ private static bool Listen(IEvent evt)
{
try
{
- LastNodeIds.Push(id);
+ LastNodeIds.Push(evt.Node.Id);
ReceivedEvents.Push(evt);
ReceivedEvent.Signal();
@@ -830,7 +830,7 @@ namespace Apache.Ignite.Core.Tests
{
// When executed on remote nodes, these exceptions will not go to sender,
// so we have to accumulate them.
- Failures.Push(string.Format("Exception in Listen (msg: {0}, id: {1}): {2}", evt, id, ex));
+ Failures.Push(string.Format("Exception in Listen (msg: {0}, id: {1}): {2}", evt, evt.Node.Id, ex));
throw;
}
}
@@ -840,28 +840,34 @@ namespace Apache.Ignite.Core.Tests
/// Test event filter.
/// </summary>
[Serializable]
- public class EventFilter<T> : IEventFilter<T> where T : IEvent
+ public class EventFilter<T> : IEventFilter<T>, IEventListener<T> where T : IEvent
{
/** */
- private readonly Func<Guid?, T, bool> _invoke;
+ private readonly Func<T, bool> _invoke;
/// <summary>
/// Initializes a new instance of the <see cref="RemoteListenEventFilter"/> class.
/// </summary>
/// <param name="invoke">The invoke delegate.</param>
- public EventFilter(Func<Guid?, T, bool> invoke)
+ public EventFilter(Func<T, bool> invoke)
{
_invoke = invoke;
}
/** <inheritdoc /> */
- bool IEventFilter<T>.Invoke(Guid? nodeId, T evt)
+ bool IEventFilter<T>.Invoke(T evt)
+ {
+ return _invoke(evt);
+ }
+
+ /** <inheritdoc /> */
+ bool IEventListener<T>.Invoke(T evt)
{
- return _invoke(nodeId, evt);
+ return _invoke(evt);
}
/** <inheritdoc /> */
- public bool Invoke(Guid nodeId, T evt)
+ public bool Invoke(T evt)
{
throw new Exception("Invalid method");
}
@@ -882,7 +888,7 @@ namespace Apache.Ignite.Core.Tests
}
/** <inheritdoc /> */
- public bool Invoke(Guid? nodeId, IEvent evt)
+ public bool Invoke(IEvent evt)
{
return evt.Type == _type;
}
@@ -906,7 +912,7 @@ namespace Apache.Ignite.Core.Tests
}
/** <inheritdoc /> */
- public bool Invoke(Guid? nodeId, IEvent evt)
+ public bool Invoke(IEvent evt)
{
return evt.Type == _type;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index a10a0a5..401b46c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -125,6 +125,7 @@
<Compile Include="Events\EventType.cs" />
<Compile Include="Events\IEvent.cs" />
<Compile Include="Events\IEventFilter.cs" />
+ <Compile Include="Events\IEventListener.cs" />
<Compile Include="Events\IEvents.cs" />
<Compile Include="Events\JobEvent.cs" />
<Compile Include="Events\SwapSpaceEvent.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventFilter.cs
index 83aca53..8c80cec 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventFilter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventFilter.cs
@@ -17,8 +17,6 @@
namespace Apache.Ignite.Core.Events
{
- using System;
-
/// <summary>
/// Represents an event filter.
/// </summary>
@@ -26,11 +24,10 @@ namespace Apache.Ignite.Core.Events
public interface IEventFilter<in T> where T : IEvent
{
/// <summary>
- /// Determines whether specified event passes this filtger.
+ /// Determines whether specified event passes this filter.
/// </summary>
- /// <param name="nodeId">Node identifier.</param>
/// <param name="evt">Event.</param>
- /// <returns>Value indicating whether specified event passes this filtger.</returns>
- bool Invoke(Guid? nodeId, T evt);
+ /// <returns>Value indicating whether specified event passes this filter.</returns>
+ bool Invoke(T evt);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventListener.cs
new file mode 100644
index 0000000..06e9ecc
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEventListener.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Events
+{
+ /// <summary>
+ /// Represents an event listener.
+ /// </summary>
+ /// <typeparam name="T">Event type.</typeparam>
+ public interface IEventListener<in T> where T : IEvent
+ {
+ /// <summary>
+ /// Invoked when event occurs.
+ /// </summary>
+ /// <param name="evt">Event.</param>
+ /// <returns>Value indicating whether this listener should be kept subscribed.
+ /// Returning false unsubscribes this listener from future notifications.</returns>
+ bool Invoke(T evt);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
index b2f07d4..e8459c6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
@@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Events
using Apache.Ignite.Core.Common;
/// <summary>
- /// Provides functionality for local and remote event notifications on nodes defined by <see cref="ClusterGroup"/>.
+ /// Provides functionality for event notifications on nodes defined by <see cref="ClusterGroup"/>.
/// <para/>
/// All members are thread-safe and may be used concurrently from multiple threads.
/// </summary>
@@ -59,91 +59,6 @@ namespace Apache.Ignite.Core.Events
where T : IEvent;
/// <summary>
- /// Adds event listener for specified events to all nodes in the cluster group (possibly including local node
- /// if it belongs to the cluster group as well). This means that all events occurring on any node within this
- /// cluster group that pass remote filter will be sent to local node for local listener notifications.
- /// <para/>
- /// The listener can be unsubscribed automatically if local node stops, if localListener callback
- /// returns false or if <see cref="StopRemoteListen"/> is called.
- /// </summary>
- /// <typeparam name="T">Type of events.</typeparam>
- /// <param name="bufSize">Remote events buffer size. Events from remote nodes won't be sent until buffer
- /// is full or time interval is exceeded.</param>
- /// <param name="interval">Maximum time interval after which events from remote node will be sent. Events
- /// from remote nodes won't be sent until buffer is full or time interval is exceeded.</param>
- /// <param name="autoUnsubscribe">Flag indicating that event listeners on remote nodes should be automatically
- /// unregistered if master node (node that initiated event listening) leaves topology.
- /// If this flag is false, listeners will be unregistered only when <see cref="StopRemoteListen"/>
- /// method is called, or the localListener returns false.</param>
- /// <param name="localListener"> Listener callback that is called on local node. If null, these events will
- /// be handled on remote nodes by passed in remoteFilter.</param>
- /// <param name="remoteFilter">
- /// Filter callback that is called on remote node. Only events that pass the remote filter will be
- /// sent to local node. If null, all events of specified types will be sent to local node.
- /// This remote filter can be used to pre-handle events remotely, before they are passed in to local callback.
- /// It will be auto-unsubscribed on the node where event occurred in case if it returns false.
- /// </param>
- /// <param name="types">
- /// Types of events to listen for. If not provided, all events that pass the provided remote filter
- /// will be sent to local node.
- /// </param>
- /// <returns>
- /// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening.
- /// </returns>
- [AsyncSupported]
- Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
- where T : IEvent;
-
- /// <summary>
- /// Adds event listener for specified events to all nodes in the cluster group (possibly including local node
- /// if it belongs to the cluster group as well). This means that all events occurring on any node within this
- /// cluster group that pass remote filter will be sent to local node for local listener notifications.
- /// <para/>
- /// The listener can be unsubscribed automatically if local node stops, if localListener callback
- /// returns false or if <see cref="StopRemoteListen"/> is called.
- /// </summary>
- /// <typeparam name="T">Type of events.</typeparam>
- /// <param name="bufSize">Remote events buffer size. Events from remote nodes won't be sent until buffer
- /// is full or time interval is exceeded.</param>
- /// <param name="interval">Maximum time interval after which events from remote node will be sent. Events
- /// from remote nodes won't be sent until buffer is full or time interval is exceeded.</param>
- /// <param name="autoUnsubscribe">Flag indicating that event listeners on remote nodes should be automatically
- /// unregistered if master node (node that initiated event listening) leaves topology.
- /// If this flag is false, listeners will be unregistered only when <see cref="StopRemoteListen"/>
- /// method is called, or the localListener returns false.</param>
- /// <param name="localListener"> Listener callback that is called on local node. If null, these events will
- /// be handled on remote nodes by passed in remoteFilter.</param>
- /// <param name="remoteFilter">
- /// Filter callback that is called on remote node. Only events that pass the remote filter will be
- /// sent to local node. If null, all events of specified types will be sent to local node.
- /// This remote filter can be used to pre-handle events remotely, before they are passed in to local callback.
- /// It will be auto-unsubscribed on the node where event occurred in case if it returns false.
- /// </param>
- /// <param name="types">
- /// Types of events to listen for. If not provided, all events that pass the provided remote filter
- /// will be sent to local node.
- /// </param>
- /// <returns>
- /// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening.
- /// </returns>
- [AsyncSupported]
- Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
- where T : IEvent;
-
- /// <summary>
- /// Stops listening to remote events. This will unregister all listeners identified with provided operation ID
- /// on all nodes defined by <see cref="ClusterGroup"/>.
- /// </summary>
- /// <param name="opId">
- /// Operation ID that was returned from
- /// <see cref="RemoteListen{T}(int, TimeSpan?, bool, IEventFilter{T},IEventFilter{T},int[])"/>.
- /// </param>
- [AsyncSupported]
- void StopRemoteListen(Guid opId);
-
- /// <summary>
/// Waits for the specified events.
/// </summary>
/// <param name="types">Types of the events to wait for.
@@ -205,7 +120,7 @@ namespace Apache.Ignite.Core.Events
/// Attempt to record internal event with this method will cause <see cref="ArgumentException"/> to be thrown.
/// </summary>
/// <param name="evt">Locally generated event.</param>
- /// <exception cref="ArgumentException">If event type is within Ignite reserved range (1 � 1000)</exception>
+ /// <exception cref="ArgumentException">If event type is within Ignite reserved range (1 to 1000)</exception>
void RecordLocal(IEvent evt);
/// <summary>
@@ -216,7 +131,7 @@ namespace Apache.Ignite.Core.Events
/// <param name="listener">Predicate that is called on each received event. If predicate returns false,
/// it will be unregistered and will stop receiving events.</param>
/// <param name="types">Event types for which this listener will be notified, should not be empty.</param>
- void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent;
+ void LocalListen<T>(IEventListener<T> listener, params int[] types) where T : IEvent;
/// <summary>
/// Adds an event listener for local events. Note that listener will be added regardless of whether
@@ -226,7 +141,7 @@ namespace Apache.Ignite.Core.Events
/// <param name="listener">Predicate that is called on each received event. If predicate returns false,
/// it will be unregistered and will stop receiving events.</param>
/// <param name="types">Event types for which this listener will be notified, should not be empty.</param>
- void LocalListen<T>(IEventFilter<T> listener, IEnumerable<int> types) where T : IEvent;
+ void LocalListen<T>(IEventListener<T> listener, IEnumerable<int> types) where T : IEvent;
/// <summary>
/// Removes local event listener.
@@ -236,7 +151,7 @@ namespace Apache.Ignite.Core.Events
/// <param name="types">Types of events for which to remove listener. If not specified, then listener
/// will be removed for all types it was registered for.</param>
/// <returns>True if listener was removed, false otherwise.</returns>
- bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent;
+ bool StopLocalListen<T>(IEventListener<T> listener, params int[] types) where T : IEvent;
/// <summary>
/// Removes local event listener.
@@ -246,7 +161,7 @@ namespace Apache.Ignite.Core.Events
/// <param name="types">Types of events for which to remove listener. If not specified, then listener
/// will be removed for all types it was registered for.</param>
/// <returns>True if listener was removed, false otherwise.</returns>
- bool StopLocalListen<T>(IEventFilter<T> listener, IEnumerable<int> types) where T : IEvent;
+ bool StopLocalListen<T>(IEventListener<T> listener, IEnumerable<int> types) where T : IEvent;
/// <summary>
/// Enables provided events. Allows to start recording events that were disabled before.
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 0f2b3c1..fb55d8e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -48,7 +48,7 @@ namespace Apache.Ignite.Core.Impl.Common
private readonly Func<object, object, object> _computeFunc;
/** */
- private readonly Func<object, Guid?, object, bool> _eventFilter;
+ private readonly Func<object, object, bool> _eventFilter;
/** */
private readonly Func<object, object, object, bool> _cacheEntryFilter;
@@ -100,7 +100,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
- public static Func<object, Guid?, object, bool> GetEventFilter(Type type)
+ public static Func<object, object, bool> GetEventFilter(Type type)
{
return Get(type)._eventFilter;
}
@@ -245,8 +245,8 @@ namespace Apache.Ignite.Core.Impl.Common
var args = iface.GetGenericArguments();
- _eventFilter = DelegateConverter.CompileFunc<Func<object, Guid?, object, bool>>(iface,
- new[] {typeof (Guid?), args[0]}, new[] {false, true, false});
+ _eventFilter = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
+ new[] {args[0]}, new[] {true, false});
}
else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>))
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index 08936e4..6898a58 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -148,7 +148,7 @@ namespace Apache.Ignite.Core.Impl.Events
if (localListener != null)
{
- var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e));
+ var listener = new RemoteListenEventFilter(Ignite, e => localListener.Invoke((T) e));
writer.WriteLong(Ignite.HandleRegistry.Allocate(listener));
}
@@ -230,11 +230,11 @@ namespace Apache.Ignite.Core.Impl.Events
/** <inheritDoc /> */
public void RecordLocal(IEvent evt)
{
- throw new NotImplementedException("GG-10244");
+ throw new NotImplementedException("IGNITE-1410");
}
/** <inheritDoc /> */
- public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+ public void LocalListen<T>(IEventListener<T> listener, params int[] types) where T : IEvent
{
IgniteArgumentCheck.NotNull(listener, "listener");
IgniteArgumentCheck.NotNullOrEmpty(types, "types");
@@ -244,13 +244,13 @@ namespace Apache.Ignite.Core.Impl.Events
}
/** <inheritDoc /> */
- public void LocalListen<T>(IEventFilter<T> listener, IEnumerable<int> types) where T : IEvent
+ public void LocalListen<T>(IEventListener<T> listener, IEnumerable<int> types) where T : IEvent
{
LocalListen(listener, TypesToArray(types));
}
/** <inheritDoc /> */
- public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+ public bool StopLocalListen<T>(IEventListener<T> listener, params int[] types) where T : IEvent
{
lock (_localFilters)
{
@@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Impl.Events
}
/** <inheritDoc /> */
- public bool StopLocalListen<T>(IEventFilter<T> listener, IEnumerable<int> types) where T : IEvent
+ public bool StopLocalListen<T>(IEventListener<T> listener, IEnumerable<int> types) where T : IEvent
{
return StopLocalListen(listener, TypesToArray(types));
}
@@ -415,7 +415,7 @@ namespace Apache.Ignite.Core.Impl.Events
/// <typeparam name="T">Type of events.</typeparam>
/// <param name="listener">Predicate that is called on each received event.</param>
/// <param name="type">Event type for which this listener will be notified</param>
- private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent
+ private void LocalListen<T>(IEventListener<T> listener, int type) where T : IEvent
{
lock (_localFilters)
{
@@ -432,7 +432,7 @@ namespace Apache.Ignite.Core.Impl.Events
if (!filters.TryGetValue(type, out localFilter))
{
- localFilter = CreateLocalFilter(listener, type);
+ localFilter = CreateLocalListener(listener, type);
filters[type] = localFilter;
}
@@ -448,10 +448,10 @@ namespace Apache.Ignite.Core.Impl.Events
/// <param name="listener">Listener.</param>
/// <param name="type">Event type.</param>
/// <returns>Created wrapper.</returns>
- private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent
+ private LocalHandledEventFilter CreateLocalListener<T>(IEventListener<T> listener, int type) where T : IEvent
{
var result = new LocalHandledEventFilter(
- stream => InvokeLocalFilter(stream, listener),
+ stream => InvokeLocalListener(stream, listener),
unused =>
{
lock (_localFilters)
@@ -484,8 +484,21 @@ namespace Apache.Ignite.Core.Impl.Events
{
var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
- // No guid in local mode
- return listener.Invoke(Guid.Empty, evt);
+ return listener.Invoke(evt);
+ }
+
+ /// <summary>
+ /// Invokes local filter using data from specified stream.
+ /// </summary>
+ /// <typeparam name="T">Event object type.</typeparam>
+ /// <param name="stream">The stream.</param>
+ /// <param name="listener">The listener.</param>
+ /// <returns>Filter invocation result.</returns>
+ private bool InvokeLocalListener<T>(IPortableStream stream, IEventListener<T> listener) where T : IEvent
+ {
+ var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
+
+ return listener.Invoke(evt);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
index ac50b35..f8e2f95 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
@@ -33,14 +33,14 @@ namespace Apache.Ignite.Core.Impl.Events
private readonly Ignite _ignite;
/** */
- private readonly Func<Guid?, IEvent, bool> _filter;
+ private readonly Func<IEvent, bool> _filter;
/// <summary>
/// Initializes a new instance of the <see cref="RemoteListenEventFilter"/> class.
/// </summary>
/// <param name="ignite">The grid.</param>
/// <param name="filter">The filter.</param>
- public RemoteListenEventFilter(Ignite ignite, Func<Guid?, IEvent, bool> filter)
+ public RemoteListenEventFilter(Ignite ignite, Func<IEvent, bool> filter)
{
_ignite = ignite;
_filter = filter;
@@ -53,9 +53,9 @@ namespace Apache.Ignite.Core.Impl.Events
var evt = EventReader.Read<IEvent>(reader);
- var nodeId = reader.ReadGuid();
+ reader.ReadGuid(); // unused node id
- return _filter(nodeId, evt) ? 1 : 0;
+ return _filter(evt) ? 1 : 0;
}
/// <summary>
@@ -78,7 +78,7 @@ namespace Apache.Ignite.Core.Impl.Events
var func = DelegateTypeDescriptor.GetEventFilter(pred.GetType());
- return new RemoteListenEventFilter(grid, (id, evt) => func(pred, id, evt));
+ return new RemoteListenEventFilter(grid, evt => func(pred, evt));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
index c1f3035..f9d54b9 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Events/EventsExample.cs
@@ -71,24 +71,6 @@ namespace Apache.Ignite.Examples.Events
Console.WriteLine(">>> Received events count: " + listener.EventsReceived);
Console.WriteLine();
-
- // Remote listen example (start standalone nodes for better demonstration)
- Console.WriteLine(">>> Listening for remote events...");
-
- var localListener = new LocalListener();
- var remoteFilter = new RemoteFilter();
-
- var listenId = ignite.GetEvents().RemoteListen(localListener: localListener,
- remoteFilter: remoteFilter, types: EventType.JobExecutionAll);
-
- if (listenId == null)
- throw new InvalidOperationException("Subscription failed.");
-
- ExecuteTask(ignite);
-
- ignite.GetEvents().StopRemoteListen(listenId.Value);
-
- Console.WriteLine(">>> Received events count: " + localListener.EventsReceived);
}
Console.WriteLine();
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Apache.Ignite.ExamplesDll.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Apache.Ignite.ExamplesDll.csproj b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Apache.Ignite.ExamplesDll.csproj
index d579a77..441e4e0 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Apache.Ignite.ExamplesDll.csproj
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Apache.Ignite.ExamplesDll.csproj
@@ -49,7 +49,6 @@
<Compile Include="Datagrid\ContinuousQueryFilter.cs" />
<Compile Include="Datagrid\EmployeeStore.cs" />
<Compile Include="Events\LocalListener.cs" />
- <Compile Include="Events\RemoteFilter.cs" />
<Compile Include="Messaging\LocalListener.cs" />
<Compile Include="Messaging\RemoteOrderedListener.cs" />
<Compile Include="Messaging\RemoteUnorderedListener.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
index 067bd2a..8c689dc 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
@@ -24,7 +24,7 @@ namespace Apache.Ignite.ExamplesDll.Events
/// <summary>
/// Local event listener.
/// </summary>
- public class LocalListener : IEventFilter<IEvent>
+ public class LocalListener : IEventListener<IEvent>
{
/** Сount of received events. */
private int _eventsReceived;
@@ -40,10 +40,9 @@ namespace Apache.Ignite.ExamplesDll.Events
/// <summary>
/// Determines whether specified event passes this filter.
/// </summary>
- /// <param name="nodeId">Node identifier.</param>
/// <param name="evt">Event.</param>
/// <returns>Value indicating whether specified event passes this filter.</returns>
- public bool Invoke(Guid? nodeId, IEvent evt)
+ public bool Invoke(IEvent evt)
{
Interlocked.Increment(ref _eventsReceived);
http://git-wip-us.apache.org/repos/asf/ignite/blob/91eeab7a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/RemoteFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/RemoteFilter.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/RemoteFilter.cs
deleted file mode 100644
index 45a957c..0000000
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/RemoteFilter.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using Apache.Ignite.Core.Events;
-
-namespace Apache.Ignite.ExamplesDll.Events
-{
- /// <summary>
- /// Remote event filter.
- /// </summary>
- [Serializable]
- public class RemoteFilter : IEventFilter<IEvent>
- {
- /// <summary>
- /// Determines whether specified event passes this filter.
- /// </summary>
- /// <param name="nodeId">Node identifier.</param>
- /// <param name="evt">Event.</param>
- /// <returns>Value indicating whether specified event passes this filter.</returns>
- public bool Invoke(Guid? nodeId, IEvent evt)
- {
- Console.WriteLine("Remote filter received event [evt={0}]", evt.Name);
-
- return evt is JobEvent;
- }
- }
-}
\ No newline at end of file
[2/6] ignite git commit: IGNITE-1664 .Net: Reviewed and fixed generic
type arguments in the API.
Posted by vo...@apache.org.
IGNITE-1664 .Net: Reviewed and fixed generic type arguments in the API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81feb959
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81feb959
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81feb959
Branch: refs/heads/ignite-1655
Commit: 81feb9596657c432efe404e6046a142b8f831df3
Parents: 52534c3
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Wed Oct 14 14:56:17 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Oct 14 14:56:17 2015 +0300
----------------------------------------------------------------------
.../Cache/CacheTestAsyncWrapper.cs | 10 +--
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 14 ++--
.../Cache/ICacheEntryProcessor.cs | 8 +-
.../Compute/ComputeTaskAdapter.cs | 16 ++--
.../Compute/ComputeTaskSplitAdapter.cs | 8 +-
.../Apache.Ignite.Core/Compute/ICompute.cs | 82 ++++++++++----------
.../Apache.Ignite.Core/Compute/IComputeFunc.cs | 8 +-
.../Apache.Ignite.Core/Compute/IComputeJob.cs | 4 +-
.../Compute/IComputeJobResult.cs | 6 +-
.../Compute/IComputeReducer.cs | 8 +-
.../Apache.Ignite.Core/Compute/IComputeTask.cs | 24 +++---
.../Datastream/StreamTransformer.cs | 12 +--
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 18 ++---
.../Impl/Cache/CacheProxyImpl.cs | 6 +-
.../Apache.Ignite.Core/Impl/Compute/Compute.cs | 40 +++++-----
.../Impl/Compute/ComputeAsync.cs | 61 ++++++++-------
.../Impl/Compute/ComputeImpl.cs | 79 ++++++++++---------
17 files changed, 209 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
index 52a856a..c4e8d7e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncWrapper.cs
@@ -340,20 +340,20 @@ namespace Apache.Ignite.Core.Tests.Cache
}
/** <inheritDoc /> */
- public TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
_cache.Invoke(key, processor, arg);
- return GetResult<TR>();
+ return GetResult<TRes>();
}
/** <inheritDoc /> */
- public IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys,
- ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys,
+ ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
_cache.InvokeAll(keys, processor, arg);
- return GetResult<IDictionary<TK, ICacheEntryProcessorResult<TR>>>();
+ return GetResult<IDictionary<TK, ICacheEntryProcessorResult<TRes>>>();
}
/** <inheritDoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index 097ab66..98ac254 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -451,15 +451,15 @@ namespace Apache.Ignite.Core.Cache
/// If an entry does not exist for the specified key, an attempt is made to load it (if a loader is configured)
/// or a surrogate entry, consisting of the key with a null value is used instead.
/// </summary>
- /// <typeparam name="TR">The type of the result.</typeparam>
- /// <typeparam name="TA">The type of the argument.</typeparam>
+ /// <typeparam name="TArg">The type of the argument.</typeparam>
+ /// <typeparam name="TRes">The type of the result.</typeparam>
/// <param name="key">The key.</param>
/// <param name="processor">The processor.</param>
/// <param name="arg">The argument.</param>
/// <returns>Result of the processing.</returns>
/// <exception cref="CacheEntryProcessorException">If an exception has occured during processing.</exception>
[AsyncSupported]
- TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg);
+ TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg);
/// <summary>
/// Invokes an <see cref="ICacheEntryProcessor{K, V, A, R}"/> against a set of keys.
@@ -471,8 +471,8 @@ namespace Apache.Ignite.Core.Cache
/// Furthermore there is no guarantee implementations will use the same processor instance
/// to process each entry, as the case may be in a non-local cache topology.
/// </summary>
- /// <typeparam name="TR">The type of the result.</typeparam>
- /// <typeparam name="TA">The type of the argument.</typeparam>
+ /// <typeparam name="TArg">The type of the argument.</typeparam>
+ /// <typeparam name="TRes">The type of the result.</typeparam>
/// <param name="keys">The keys.</param>
/// <param name="processor">The processor.</param>
/// <param name="arg">The argument.</param>
@@ -483,8 +483,8 @@ namespace Apache.Ignite.Core.Cache
/// </returns>
/// <exception cref="CacheEntryProcessorException">If an exception has occured during processing.</exception>
[AsyncSupported]
- IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys,
- ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg);
+ IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys,
+ ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg);
/// <summary>
/// Creates an <see cref="ICacheLock"/> instance associated with passed key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheEntryProcessor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheEntryProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheEntryProcessor.cs
index c8614c0..7a0fff1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheEntryProcessor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICacheEntryProcessor.cs
@@ -30,9 +30,9 @@ namespace Apache.Ignite.Core.Cache
/// </summary>
/// <typeparam name="TK">Key type.</typeparam>
/// <typeparam name="TV">Value type.</typeparam>
- /// <typeparam name="TA">The type of the processor argument.</typeparam>
- /// <typeparam name="TR">The type of the processor result.</typeparam>
- public interface ICacheEntryProcessor<in TK, TV, in TA, out TR>
+ /// <typeparam name="TArg">The type of the processor argument.</typeparam>
+ /// <typeparam name="TRes">The type of the processor result.</typeparam>
+ public interface ICacheEntryProcessor<in TK, TV, in TArg, out TRes>
{
/// <summary>
/// Process an entry.
@@ -40,6 +40,6 @@ namespace Apache.Ignite.Core.Cache
/// <param name="entry">The entry to process.</param>
/// <param name="arg">The argument.</param>
/// <returns>Processing result.</returns>
- TR Process(IMutableCacheEntry<TK, TV> entry, TA arg);
+ TRes Process(IMutableCacheEntry<TK, TV> entry, TArg arg);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskAdapter.cs
index 67f7432..5965d2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskAdapter.cs
@@ -23,9 +23,9 @@ namespace Apache.Ignite.Core.Compute
using Apache.Ignite.Core.Common;
/// <summary>
- /// Convenience adapter for <see cref="IComputeTask{A,T,R}"/> interface
+ /// Convenience adapter for <see cref="IComputeTask{TArg,TJobRes,TTaskRes}"/> interface
/// </summary>
- public abstract class ComputeTaskAdapter<TA, T, TR> : IComputeTask<TA, T, TR>
+ public abstract class ComputeTaskAdapter<TArg, TJobRes, TTaskRes> : IComputeTask<TArg, TJobRes, TTaskRes>
{
/// <summary>
/// Default implementation which will wait for all jobs to complete before
@@ -42,7 +42,8 @@ namespace Apache.Ignite.Core.Compute
/// <param name="res">Received remote Ignite executable result.</param>
/// <param name="rcvd">All previously received results.</param>
/// <returns>Result policy that dictates how to process further upcoming job results.</returns>
- public virtual ComputeJobResultPolicy Result(IComputeJobResult<T> res, IList<IComputeJobResult<T>> rcvd)
+ public virtual ComputeJobResultPolicy Result(IComputeJobResult<TJobRes> res,
+ IList<IComputeJobResult<TJobRes>> rcvd)
{
Exception err = res.Exception();
@@ -52,8 +53,9 @@ namespace Apache.Ignite.Core.Compute
err is ComputeJobFailoverException)
return ComputeJobResultPolicy.Failover;
- throw new IgniteException("Remote job threw user exception (override or implement IComputeTask.result(..) " +
- "method if you would like to have automatic failover for this exception).", err);
+ throw new IgniteException("Remote job threw user exception (override or implement " +
+ "IComputeTask.result(..) method if you would like to have automatic failover for this exception).",
+ err);
}
return ComputeJobResultPolicy.Wait;
@@ -73,7 +75,7 @@ namespace Apache.Ignite.Core.Compute
/// Map of Ignite jobs assigned to subgrid node. If <c>null</c> or empty map is returned,
/// exception will be thrown.
/// </returns>
- public abstract IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg);
+ public abstract IDictionary<IComputeJob<TJobRes>, IClusterNode> Map(IList<IClusterNode> subgrid, TArg arg);
/// <summary>
/// Reduces (or aggregates) results received so far into one compound result to be returned to
@@ -88,6 +90,6 @@ namespace Apache.Ignite.Core.Compute
/// <returns>
/// Task result constructed from results of remote executions.
/// </returns>
- public abstract TR Reduce(IList<IComputeJobResult<T>> results);
+ public abstract TTaskRes Reduce(IList<IComputeJobResult<TJobRes>> results);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskSplitAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskSplitAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskSplitAdapter.cs
index bf4685a..14651b1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskSplitAdapter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ComputeTaskSplitAdapter.cs
@@ -30,7 +30,7 @@ namespace Apache.Ignite.Core.Compute
/// in most homogeneous environments where all nodes are equally suitable for executing grid
/// job, see <see cref="Split"/> method for more details.
/// </summary>
- public abstract class ComputeTaskSplitAdapter<TA, T, TR> : ComputeTaskAdapter<TA, T, TR>
+ public abstract class ComputeTaskSplitAdapter<TArg, TJobRes, TTaskRes> : ComputeTaskAdapter<TArg, TJobRes, TTaskRes>
{
/** Random generator */
[ThreadStatic]
@@ -49,7 +49,7 @@ namespace Apache.Ignite.Core.Compute
/// <param name="gridSize">Number of available Ignite nodes. Note that returned number of jobs can be less,
/// equal or greater than this grid size.</param>
/// <param name="arg">Task execution argument. Can be <c>null</c>.</param>
- protected abstract ICollection<IComputeJob<T>> Split(int gridSize, TA arg);
+ protected abstract ICollection<IComputeJob<TJobRes>> Split(int gridSize, TArg arg);
/// <summary>
/// This method is called to map or split Ignite task into multiple Ignite jobs. This is the
@@ -66,7 +66,7 @@ namespace Apache.Ignite.Core.Compute
/// exception will be thrown.
/// </returns>
/// <exception cref="IgniteException">Split returned no jobs.</exception>
- override public IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg)
+ override public IDictionary<IComputeJob<TJobRes>, IClusterNode> Map(IList<IClusterNode> subgrid, TArg arg)
{
Debug.Assert(subgrid != null && subgrid.Count > 0);
@@ -75,7 +75,7 @@ namespace Apache.Ignite.Core.Compute
if (jobs == null || jobs.Count == 0)
throw new IgniteException("Split returned no jobs.");
- var map = new Dictionary<IComputeJob<T>, IClusterNode>(jobs.Count);
+ var map = new Dictionary<IComputeJob<TJobRes>, IClusterNode>(jobs.Count);
if (_rnd == null)
_rnd = new Random();
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
index c124f84..28471aa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs
@@ -84,8 +84,8 @@ namespace Apache.Ignite.Core.Compute
/// <param name="taskName">Java task name</param>
/// <param name="taskArg">Optional argument of task execution, can be null.</param>
/// <returns>Task result.</returns>
- /// <typeparam name="T">Type of task result.</typeparam>
- T ExecuteJavaTask<T>(string taskName, object taskArg);
+ /// <typeparam name="TRes">Type of task result.</typeparam>
+ TRes ExecuteJavaTask<TRes>(string taskName, object taskArg);
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
@@ -94,11 +94,11 @@ namespace Apache.Ignite.Core.Compute
/// <param name="task">Task to execute.</param>
/// <param name="taskArg">Optional task argument.</param>
/// <returns>Task result.</returns>
- /// <typeparam name="TA">Argument type.</typeparam>
- /// <typeparam name="T">Type of job result.</typeparam>
- /// <typeparam name="TR">Type of reduce result.</typeparam>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of final task result.</typeparam>
[AsyncSupported]
- TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg);
+ TRes Execute<TArg, TJobRes, TRes>(IComputeTask<TArg, TJobRes, TRes> task, TArg taskArg);
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
@@ -106,10 +106,10 @@ namespace Apache.Ignite.Core.Compute
/// </summary>
/// <param name="task">Task to execute.</param>
/// <returns>Task result.</returns>
- /// <typeparam name="T">Type of job result.</typeparam>
- /// <typeparam name="TR">Type of reduce result.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
[AsyncSupported]
- TR Execute<T, TR>(IComputeTask<T, TR> task);
+ TRes Execute<TJobRes, TRes>(IComputeTask<TJobRes, TRes> task);
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
@@ -118,11 +118,11 @@ namespace Apache.Ignite.Core.Compute
/// <param name="taskType">Task type.</param>
/// <param name="taskArg">Optional task argument.</param>
/// <returns>Task result.</returns>
- /// <typeparam name="TA">Argument type.</typeparam>
- /// <typeparam name="T">Type of job result.</typeparam>
- /// <typeparam name="TR">Type of reduce result.</typeparam>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
[AsyncSupported]
- TR Execute<TA, T, TR>(Type taskType, TA taskArg);
+ TRes Execute<TArg, TJobRes, TRes>(Type taskType, TArg taskArg);
/// <summary>
/// Executes given task on the grid projection. For step-by-step explanation of task execution process
@@ -130,10 +130,10 @@ namespace Apache.Ignite.Core.Compute
/// </summary>
/// <param name="taskType">Task type.</param>
/// <returns>Task result.</returns>
- /// <typeparam name="T">Type of job result.</typeparam>
- /// <typeparam name="TR">Type of reduce result.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of reduce result.</typeparam>
[AsyncSupported]
- TR Execute<T, TR>(Type taskType);
+ TRes Execute<TJobRes, TRes>(Type taskType);
/// <summary>
/// Executes provided job on a node in this grid projection. The result of the
@@ -141,9 +141,9 @@ namespace Apache.Ignite.Core.Compute
/// </summary>
/// <param name="clo">Job to execute.</param>
/// <returns>Job result for this execution.</returns>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- TR Call<TR>(IComputeFunc<TR> clo);
+ TRes Call<TRes>(IComputeFunc<TRes> clo);
/// <summary>
/// Executes given job on the node where data for provided affinity key is located
@@ -153,29 +153,30 @@ namespace Apache.Ignite.Core.Compute
/// <param name="affinityKey">Affinity key.</param>
/// <param name="clo">Job to execute.</param>
/// <returns>Job result for this execution.</returns>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo);
+ TRes AffinityCall<TRes>(string cacheName, object affinityKey, IComputeFunc<TRes> clo);
/// <summary>
/// Executes collection of jobs on nodes within this grid projection.
/// </summary>
/// <param name="clos">Collection of jobs to execute.</param>
- /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
+ /// <param name="reducer">Reducer to reduce all job results into one individual return value.</param>
/// <returns>Reduced job result for this execution.</returns>
- /// <typeparam name="TR1">Type of job result.</typeparam>
- /// <typeparam name="TR2">Type of reduced result.</typeparam>
+ /// <typeparam name="TFuncRes">Type of function result.</typeparam>
+ /// <typeparam name="TRes">Type of result after reduce.</typeparam>
[AsyncSupported]
- TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc);
+ TRes Call<TFuncRes, TRes>(IEnumerable<IComputeFunc<TFuncRes>> clos,
+ IComputeReducer<TFuncRes, TRes> reducer);
/// <summary>
/// Executes collection of jobs on nodes within this grid projection.
/// </summary>
/// <param name="clos">Collection of jobs to execute.</param>
/// <returns>Collection of job results for this execution.</returns>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos);
+ ICollection<TRes> Call<TRes>(IEnumerable<IComputeFunc<TRes>> clos);
/// <summary>
/// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result.
@@ -183,7 +184,7 @@ namespace Apache.Ignite.Core.Compute
/// <param name="clo">Job to broadcast to all projection nodes.</param>
/// <returns>Collection of results for this execution.</returns>
[AsyncSupported]
- ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo);
+ ICollection<TRes> Broadcast<TRes>(IComputeFunc<TRes> clo);
/// <summary>
/// Broadcasts given closure job with passed in argument to all nodes in grid projection.
@@ -192,10 +193,10 @@ namespace Apache.Ignite.Core.Compute
/// <param name="clo">Job to broadcast to all projection nodes.</param>
/// <param name="arg">Job closure argument.</param>
/// <returns>Collection of results for this execution.</returns>
- /// <typeparam name="T">Type of argument.</typeparam>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg);
+ ICollection<TRes> Broadcast<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg);
/// <summary>
/// Broadcasts given job to all nodes in grid projection.
@@ -234,10 +235,10 @@ namespace Apache.Ignite.Core.Compute
/// <param name="clo">Job to run.</param>
/// <param name="arg">Job argument.</param>
/// <returns>Job result for this execution.</returns>
- /// <typeparam name="T">Type of argument.</typeparam>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg);
+ TRes Apply<TArg, TRes>(IComputeFunc<TArg, TRes> clo, TArg arg);
/// <summary>
/// Executes provided closure job on nodes within this grid projection. A new job is executed for
@@ -247,10 +248,10 @@ namespace Apache.Ignite.Core.Compute
/// <param name="clo">Job to run.</param>
/// <param name="args">Job arguments.</param>
/// <returns>Сollection of job results.</returns>
- /// <typeparam name="T">Type of argument.</typeparam>
- /// <typeparam name="TR">Type of job result.</typeparam>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TRes">Type of job result.</typeparam>
[AsyncSupported]
- ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args);
+ ICollection<TRes> Apply<TArg, TRes>(IComputeFunc<TArg, TRes> clo, IEnumerable<TArg> args);
/// <summary>
/// Executes provided closure job on nodes within this grid projection. A new job is executed for
@@ -262,10 +263,11 @@ namespace Apache.Ignite.Core.Compute
/// <param name="args">Job arguments.</param>
/// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
/// <returns>Reduced job result for this execution.</returns>
- /// <typeparam name="T">Type of argument.</typeparam>
- /// <typeparam name="TR1">Type of job result.</typeparam>
- /// <typeparam name="TR2">Type of reduced result.</typeparam>
+ /// <typeparam name="TArg">Type of argument.</typeparam>
+ /// <typeparam name="TFuncRes">Type of function result.</typeparam>
+ /// <typeparam name="TRes">Type of result after reduce.</typeparam>
[AsyncSupported]
- TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc);
+ TRes Apply<TArg, TFuncRes, TRes>(IComputeFunc<TArg, TFuncRes> clo, IEnumerable<TArg> args,
+ IComputeReducer<TFuncRes, TRes> rdc);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeFunc.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeFunc.cs
index 4a43f11..9c6cdf7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeFunc.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeFunc.cs
@@ -20,26 +20,26 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Defines function having a single argument.
/// </summary>
- public interface IComputeFunc<in T, out TR>
+ public interface IComputeFunc<in TArg, out TRes>
{
/// <summary>
/// Invoke function.
/// </summary>
/// <param name="arg">Argument.</param>
/// <returns>Result.</returns>
- TR Invoke(T arg);
+ TRes Invoke(TArg arg);
}
/// <summary>
/// Defines function having no arguments.
/// </summary>
- public interface IComputeFunc<out T>
+ public interface IComputeFunc<out TRes>
{
/// <summary>
/// Invoke function.
/// </summary>
/// <returns>Result.</returns>
- T Invoke();
+ TRes Invoke();
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJob.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJob.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJob.cs
index a755bac..684ff95 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJob.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJob.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Core.Compute
/// Ignite job implementation can be injected with <see cref="IIgnite"/> using
/// <see cref="InstanceResourceAttribute"/> attribute.
/// </summary>
- public interface IComputeJob<out T>
+ public interface IComputeJob<out TRes>
{
/// <summary>
/// Executes this job.
@@ -44,7 +44,7 @@ namespace Apache.Ignite.Core.Compute
/// in <see cref="IComputeJobResult{T}"/> object passed into
/// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{T}, IList{IComputeJobResult{T}})"/>
/// on caller node.</returns>
- T Execute();
+ TRes Execute();
/// <summary>
/// This method is called when system detects that completion of this
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJobResult.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJobResult.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJobResult.cs
index 5891fd7..6369eb5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJobResult.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeJobResult.cs
@@ -25,7 +25,7 @@ namespace Apache.Ignite.Core.Compute
/// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{T}, IList{IComputeJobResult{T}})"/>
/// method.
/// </summary>
- public interface IComputeJobResult<out T>
+ public interface IComputeJobResult<out TRes>
{
/// <summary>
/// Gets data returned by remote job if it didn't fail. This data is the
@@ -39,13 +39,13 @@ namespace Apache.Ignite.Core.Compute
///
/// </summary>
/// <returns>Data returned by job.</returns>
- T Data();
+ TRes Data();
/// <summary>
/// Gets local instance of remote job produced this result.
/// </summary>
/// <returns></returns>
- IComputeJob<T> Job();
+ IComputeJob<TRes> Job();
/// <summary>
/// Gets exception produced by execution of remote job, or <c>null</c> if no
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeReducer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeReducer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeReducer.cs
index 46dcbd9..c2e6087 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeReducer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeReducer.cs
@@ -20,7 +20,9 @@ namespace Apache.Ignite.Core.Compute
/// <summary>
/// Compute reducer which is capable of result collecting and reducing.
/// </summary>
- public interface IComputeReducer<in TR1, out TR2>
+ /// <typeparam name="TRes">Type of results passed for reducing.</typeparam>
+ /// <typeparam name="TReduceRes">Type of reduced result.</typeparam>
+ public interface IComputeReducer<in TRes, out TReduceRes>
{
/// <summary>
/// Collect closure execution result.
@@ -28,12 +30,12 @@ namespace Apache.Ignite.Core.Compute
/// <param name="res">Result.</param>
/// <returns><c>True</c> to continue collecting results until all closures are finished,
/// <c>false</c> to start reducing.</returns>
- bool Collect(TR1 res);
+ bool Collect(TRes res);
/// <summary>
/// Reduce closure execution results collected earlier.
/// </summary>
/// <returns>Reduce result.</returns>
- TR2 Reduce();
+ TReduceRes Reduce();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTask.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTask.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTask.cs
index 21b6c48..d3d7ccf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTask.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/IComputeTask.cs
@@ -33,7 +33,7 @@ namespace Apache.Ignite.Core.Compute
/// <description>Inject annotated resources into task instance.</description>
/// </item>
/// <item>
- /// <description>Apply <see cref="IComputeTask{A,T,R}.Map(IList{IClusterNode}, TA)"/>.
+ /// <description>Apply <see cref="IComputeTask{A,T,R}.Map(IList{IClusterNode}, TArg)"/>.
/// This method is responsible for splitting business logic into multiple jobs
/// (units of execution) and mapping them to Ignite nodes.</description>
/// </item>
@@ -42,7 +42,7 @@ namespace Apache.Ignite.Core.Compute
/// </item>
/// <item>
/// <description>Once job execution results become available method
- /// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{T}, IList{IComputeJobResult{T}})"/>
+ /// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{TJobRes}, IList{IComputeJobResult{TJobRes}})"/>
/// will be called for ech received job result. The policy returned by this method will
/// determine the way task reacts to every job result.
/// <para />
@@ -66,19 +66,19 @@ namespace Apache.Ignite.Core.Compute
/// </item>
/// <item>
/// <description>Once all results are received or
- /// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{T}, IList{IComputeJobResult{T}})"/>
+ /// <see cref="IComputeTask{A,T,R}.Result(IComputeJobResult{TJobRes}, IList{IComputeJobResult{TJobRes}})"/>
/// method returned <see cref="ComputeJobResultPolicy.Reduce"/> policy, method
- /// <see cref="IComputeTask{A,T,R}.Reduce(IList{IComputeJobResult{T}})"/>
+ /// <see cref="IComputeTask{A,T,R}.Reduce(IList{IComputeJobResult{TJobRes}})"/>
/// is called to aggregate received results into one final result. Once this method is finished the
/// execution of the Ignite task is complete. This result will be returned to the user through future.
/// </description>
/// </item>
/// </list>
/// </summary>
- /// <typeparam name="TA">Argument type.</typeparam>
- /// <typeparam name="T">Type of job result.</typeparam>
- /// <typeparam name="TR">Type of reduce result.</typeparam>
- public interface IComputeTask<in TA, T, out TR>
+ /// <typeparam name="TArg">Argument type.</typeparam>
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ /// <typeparam name="TRes">Type of final task result after reduce.</typeparam>
+ public interface IComputeTask<in TArg, TJobRes, out TRes>
{
/// <summary>
/// This method is called to map or split Ignite task into multiple Ignite jobs. This is the
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Compute
/// as the one passed into <c>ICompute.Execute()</c> methods.</param>
/// <returns>Map of Ignite jobs assigned to subgrid node. If <c>null</c> or empty map is returned,
/// exception will be thrown.</returns>
- IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg);
+ IDictionary<IComputeJob<TJobRes>, IClusterNode> Map(IList<IClusterNode> subgrid, TArg arg);
/// <summary>
/// Asynchronous callback invoked every time a result from remote execution is
@@ -105,7 +105,7 @@ namespace Apache.Ignite.Core.Compute
/// <param name="rcvd">All previously received results. Note that if task class has
/// <see cref="ComputeTaskNoResultCacheAttribute"/> attribute, then this list will be empty.</param>
/// <returns>Result policy that dictates how to process further upcoming job results.</returns>
- ComputeJobResultPolicy Result(IComputeJobResult<T> res, IList<IComputeJobResult<T>> rcvd);
+ ComputeJobResultPolicy Result(IComputeJobResult<TJobRes> res, IList<IComputeJobResult<TJobRes>> rcvd);
/// <summary>
/// Reduces (or aggregates) results received so far into one compound result to be returned to
@@ -118,14 +118,14 @@ namespace Apache.Ignite.Core.Compute
/// <param name="results">Received job results. Note that if task class has
/// <see cref="ComputeTaskNoResultCacheAttribute"/> attribute, then this list will be empty.</param>
/// <returns>Task result constructed from results of remote executions.</returns>
- TR Reduce(IList<IComputeJobResult<T>> results);
+ TRes Reduce(IList<IComputeJobResult<TJobRes>> results);
}
/// <summary>
/// IComputeTask without an argument.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")]
- public interface IComputeTask<T, out TR> : IComputeTask<object, T, TR>
+ public interface IComputeTask<TJobRes, out TReduceRes> : IComputeTask<object, TJobRes, TReduceRes>
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
index 0398342..d8b4620 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
@@ -30,19 +30,19 @@ namespace Apache.Ignite.Core.Datastream
/// </summary>
/// <typeparam name="TK">Key type.</typeparam>
/// <typeparam name="TV">Value type.</typeparam>
- /// <typeparam name="TA">The type of the processor argument.</typeparam>
- /// <typeparam name="TR">The type of the processor result.</typeparam>
- public sealed class StreamTransformer<TK, TV, TA, TR> : IStreamReceiver<TK, TV>,
+ /// <typeparam name="TArg">The type of the processor argument.</typeparam>
+ /// <typeparam name="TRes">The type of the processor result.</typeparam>
+ public sealed class StreamTransformer<TK, TV, TArg, TRes> : IStreamReceiver<TK, TV>,
IPortableWriteAware
{
/** Entry processor. */
- private readonly ICacheEntryProcessor<TK, TV, TA, TR> _proc;
+ private readonly ICacheEntryProcessor<TK, TV, TArg, TRes> _proc;
/// <summary>
/// Initializes a new instance of the <see cref="StreamTransformer{K, V, A, R}"/> class.
/// </summary>
/// <param name="proc">Entry processor.</param>
- public StreamTransformer(ICacheEntryProcessor<TK, TV, TA, TR> proc)
+ public StreamTransformer(ICacheEntryProcessor<TK, TV, TArg, TRes> proc)
{
IgniteArgumentCheck.NotNull(proc, "proc");
@@ -57,7 +57,7 @@ namespace Apache.Ignite.Core.Datastream
foreach (var entry in entries)
keys.Add(entry.Key);
- cache.InvokeAll(keys, _proc, default(TA));
+ cache.InvokeAll(keys, _proc, default(TArg));
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 27c53ad..dcecc52 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -520,33 +520,33 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/** <inheritdoc /> */
- public TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(processor, "processor");
var holder = new CacheEntryProcessorHolder(processor, arg,
- (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TA)a), typeof(TK), typeof(TV));
+ (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
return DoOutInOp((int)CacheOp.Invoke, writer =>
{
writer.Write(key);
writer.Write(holder);
},
- input => GetResultOrThrow<TR>(Unmarshal<object>(input)));
+ input => GetResultOrThrow<TRes>(Unmarshal<object>(input)));
}
/** <inheritdoc /> */
- public IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys,
- ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys,
+ ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
IgniteArgumentCheck.NotNull(keys, "keys");
IgniteArgumentCheck.NotNull(processor, "processor");
var holder = new CacheEntryProcessorHolder(processor, arg,
- (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TA)a), typeof(TK), typeof(TV));
+ (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
return DoOutInOp((int)CacheOp.InvokeAll, writer =>
{
@@ -556,10 +556,10 @@ namespace Apache.Ignite.Core.Impl.Cache
input =>
{
if (IsAsync)
- _invokeAllConverter.Value = (Func<PortableReaderImpl, IDictionary<TK, ICacheEntryProcessorResult<TR>>>)
- (reader => ReadInvokeAllResults<TR>(reader.Stream));
+ _invokeAllConverter.Value = (Func<PortableReaderImpl, IDictionary<TK, ICacheEntryProcessorResult<TRes>>>)
+ (reader => ReadInvokeAllResults<TRes>(reader.Stream));
- return ReadInvokeAllResults<TR>(input);
+ return ReadInvokeAllResults<TRes>(input);
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs
index bfd7866..0f868d8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs
@@ -421,7 +421,7 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/** <inheritDoc /> */
- public TR Invoke<TR, TA>(TK key, ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
var result = _cache.Invoke(key, processor, arg);
@@ -431,8 +431,8 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/** <inheritDoc /> */
- public IDictionary<TK, ICacheEntryProcessorResult<TR>> InvokeAll<TR, TA>(IEnumerable<TK> keys,
- ICacheEntryProcessor<TK, TV, TA, TR> processor, TA arg)
+ public IDictionary<TK, ICacheEntryProcessorResult<TRes>> InvokeAll<TArg, TRes>(IEnumerable<TK> keys,
+ ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
var result = _cache.InvokeAll(keys, processor, arg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
index 7efabd1..d7fc59f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
@@ -98,72 +98,73 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ public TReduceRes ExecuteJavaTask<TReduceRes>(string taskName, object taskArg)
{
- return _compute.ExecuteJavaTask<T>(taskName, taskArg);
+ return _compute.ExecuteJavaTask<TReduceRes>(taskName, taskArg);
}
/** <inheritDoc /> */
- public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg)
{
return _compute.Execute(task, taskArg).Get();
}
/** <inheritDoc /> */
- public TR Execute<T, TR>(IComputeTask<T, TR> task)
+ public TJobRes Execute<TArg, TJobRes>(IComputeTask<TArg, TJobRes> task)
{
return _compute.Execute(task, null).Get();
}
/** <inheritDoc /> */
- public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg)
{
- return _compute.Execute<TA, T, TR>(taskType, taskArg).Get();
+ return _compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg).Get();
}
- public TR Execute<T, TR>(Type taskType)
+ public TReduceRes Execute<TArg, TReduceRes>(Type taskType)
{
- return _compute.Execute<object, T, TR>(taskType, null).Get();
+ return _compute.Execute<object, TArg, TReduceRes>(taskType, null).Get();
}
/** <inheritDoc /> */
- public TR Call<TR>(IComputeFunc<TR> clo)
+ public TJobRes Call<TJobRes>(IComputeFunc<TJobRes> clo)
{
return _compute.Execute(clo).Get();
}
/** <inheritDoc /> */
- public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ public TJobRes AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo)
{
return _compute.AffinityCall(cacheName, affinityKey, clo).Get();
}
/** <inheritDoc /> */
- public TR Call<TR>(Func<TR> func)
+ public TJobRes Call<TJobRes>(Func<TJobRes> func)
{
return _compute.Execute(func).Get();
}
/** <inheritDoc /> */
- public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ public ICollection<TJobRes> Call<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos)
{
return _compute.Execute(clos).Get();
}
/** <inheritDoc /> */
- public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ public TReduceRes Call<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos,
+ IComputeReducer<TJobRes, TReduceRes> reducer)
{
- return _compute.Execute(clos, rdc).Get();
+ return _compute.Execute(clos, reducer).Get();
}
/** <inheritDoc /> */
- public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+ public ICollection<TJobRes> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo)
{
return _compute.Broadcast(clo).Get();
}
/** <inheritDoc /> */
- public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public ICollection<TJobRes> Broadcast<T, TJobRes>(IComputeFunc<T, TJobRes> clo, T arg)
{
return _compute.Broadcast(clo, arg).Get();
}
@@ -193,19 +194,20 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public TJobRes Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
return _compute.Apply(clo, arg).Get();
}
/** <inheritDoc /> */
- public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ public ICollection<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args)
{
return _compute.Apply(clo, args).Get();
}
/** <inheritDoc /> */
- public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+ public TReduceRes Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo,
+ IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc)
{
return _compute.Apply(clo, args, rdc).Get();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
index 26c9bf4..89c5b83 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
@@ -117,71 +117,71 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ public TReduceRes ExecuteJavaTask<TReduceRes>(string taskName, object taskArg)
{
- _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg);
+ _curFut.Value = Compute.ExecuteJavaTaskAsync<TReduceRes>(taskName, taskArg);
- return default(T);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task, TArg taskArg)
{
_curFut.Value = Compute.Execute(task, taskArg);
- return default(TR);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public TR Execute<T, TR>(IComputeTask<T, TR> task)
+ public TReduceRes Execute<TJobRes, TReduceRes>(IComputeTask<TJobRes, TReduceRes> task)
{
_curFut.Value = Compute.Execute(task, null);
- return default(TR);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+ public TReduceRes Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg)
{
- _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg);
+ _curFut.Value = Compute.Execute<TArg, TJobRes, TReduceRes>(taskType, taskArg);
- return default(TR);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public TR Execute<T, TR>(Type taskType)
+ public TReduceRes Execute<TJobRes, TReduceRes>(Type taskType)
{
- _curFut.Value = Compute.Execute<object, T, TR>(taskType, null);
+ _curFut.Value = Compute.Execute<object, TJobRes, TReduceRes>(taskType, null);
- return default(TR);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public TR Call<TR>(IComputeFunc<TR> clo)
+ public TJobRes Call<TJobRes>(IComputeFunc<TJobRes> clo)
{
_curFut.Value = Compute.Execute(clo);
- return default(TR);
+ return default(TJobRes);
}
/** <inheritDoc /> */
- public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ public TJobRes AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo)
{
Compute.AffinityCall(cacheName, affinityKey, clo);
- return default(TR);
+ return default(TJobRes);
}
/** <inheritDoc /> */
- public TR Call<TR>(Func<TR> func)
+ public TJobRes Call<TJobRes>(Func<TJobRes> func)
{
_curFut.Value = Compute.Execute(func);
- return default(TR);
+ return default(TJobRes);
}
/** <inheritDoc /> */
- public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ public ICollection<TJobRes> Call<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos)
{
_curFut.Value = Compute.Execute(clos);
@@ -189,15 +189,15 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ public TReduceRes Call<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos, IComputeReducer<TJobRes, TReduceRes> reducer)
{
- _curFut.Value = Compute.Execute(clos, rdc);
+ _curFut.Value = Compute.Execute(clos, reducer);
- return default(TR2);
+ return default(TReduceRes);
}
/** <inheritDoc /> */
- public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+ public ICollection<TJobRes> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo)
{
_curFut.Value = Compute.Broadcast(clo);
@@ -205,7 +205,7 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public ICollection<TJobRes> Broadcast<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
_curFut.Value = Compute.Broadcast(clo, arg);
@@ -237,15 +237,15 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public TJobRes Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
_curFut.Value = Compute.Apply(clo, arg);
- return default(TR);
+ return default(TJobRes);
}
/** <inheritDoc /> */
- public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ public ICollection<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, IEnumerable<TArg> args)
{
_curFut.Value = Compute.Apply(clo, args);
@@ -253,11 +253,12 @@ namespace Apache.Ignite.Core.Impl.Compute
}
/** <inheritDoc /> */
- public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+ public TReduceRes Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo,
+ IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc)
{
_curFut.Value = Compute.Apply(clo, args, rdc);
- return default(TR2);
+ return default(TReduceRes);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/81feb959/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index f0ff968..abd54da 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -124,7 +124,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/// Executes given Java task on the grid projection. If task for given name has not been deployed yet,
/// then 'taskName' will be used as task class name to auto-deploy the task.
/// </summary>
- public T ExecuteJavaTask<T>(string taskName, object taskArg)
+ public TReduceRes ExecuteJavaTask<TReduceRes>(string taskName, object taskArg)
{
IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
@@ -132,7 +132,7 @@ namespace Apache.Ignite.Core.Impl.Compute
try
{
- T res = DoOutInOp<T>(OpExec, writer =>
+ TReduceRes res = DoOutInOp<TReduceRes>(OpExec, writer =>
{
WriteTask(writer, taskName, taskArg, nodes);
});
@@ -150,7 +150,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/// If task for given name has not been deployed yet,
/// then 'taskName' will be used as task class name to auto-deploy the task.
/// </summary>
- public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg)
+ public IFuture<TReduceRes> ExecuteJavaTaskAsync<TReduceRes>(string taskName, object taskArg)
{
IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName");
@@ -158,14 +158,14 @@ namespace Apache.Ignite.Core.Impl.Compute
try
{
- IFuture<T> fut = null;
+ IFuture<TReduceRes> fut = null;
DoOutInOp(OpExecAsync, writer =>
{
WriteTask(writer, taskName, taskArg, nodes);
}, input =>
{
- fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value);
+ fut = GetFuture<TReduceRes>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value);
});
return fut;
@@ -183,11 +183,12 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="task">Task to execute.</param>
/// <param name="taskArg">Optional task argument.</param>
/// <returns>Task result.</returns>
- public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+ public IFuture<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(IComputeTask<TArg, TJobRes, TReduceRes> task,
+ TArg taskArg)
{
IgniteArgumentCheck.NotNull(task, "task");
- var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg);
+ var holder = new ComputeTaskHolder<TArg, TJobRes, TReduceRes>((Ignite) _prj.Ignite, this, task, taskArg);
long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder);
@@ -203,13 +204,13 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="taskType">Task type.</param>
/// <param name="taskArg">Optional task argument.</param>
/// <returns>Task result.</returns>
- public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg)
+ public IFuture<TReduceRes> Execute<TArg, TJobRes, TReduceRes>(Type taskType, TArg taskArg)
{
IgniteArgumentCheck.NotNull(taskType, "taskType");
object task = FormatterServices.GetUninitializedObject(taskType);
- var task0 = task as IComputeTask<TA, T, TR>;
+ var task0 = task as IComputeTask<TArg, TJobRes, TReduceRes>;
if (task0 == null)
throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name);
@@ -223,11 +224,11 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="clo">Job to execute.</param>
/// <returns>Job result for this execution.</returns>
- public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo)
+ public IFuture<TJobRes> Execute<TJobRes>(IComputeFunc<TJobRes> clo)
{
IgniteArgumentCheck.NotNull(clo, "clo");
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TJobRes, TJobRes>(),
new ComputeOutFuncJob(clo.ToNonGeneric()), null, false);
}
@@ -237,13 +238,13 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="func">Func to execute.</param>
/// <returns>Job result for this execution.</returns>
- public IFuture<TR> Execute<TR>(Func<TR> func)
+ public IFuture<TJobRes> Execute<TJobRes>(Func<TJobRes> func)
{
IgniteArgumentCheck.NotNull(func, "func");
var wrappedFunc = new ComputeOutFuncWrapper(func, () => func());
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TJobRes, TJobRes>(),
new ComputeOutFuncJob(wrappedFunc), null, false);
}
@@ -252,16 +253,16 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="clos">Collection of jobs to execute.</param>
/// <returns>Collection of job results for this execution.</returns>
- public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos)
+ public IFuture<ICollection<TJobRes>> Execute<TJobRes>(IEnumerable<IComputeFunc<TJobRes>> clos)
{
IgniteArgumentCheck.NotNull(clos, "clos");
ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos));
- foreach (IComputeFunc<TR> clo in clos)
+ foreach (IComputeFunc<TJobRes> clo in clos)
jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count),
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TJobRes, ICollection<TJobRes>>(jobs.Count),
null, jobs, false);
}
@@ -271,7 +272,8 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="clos">Collection of jobs to execute.</param>
/// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
/// <returns>Collection of job results for this execution.</returns>
- public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+ public IFuture<TReduceRes> Execute<TJobRes, TReduceRes>(IEnumerable<IComputeFunc<TJobRes>> clos,
+ IComputeReducer<TJobRes, TReduceRes> rdc)
{
IgniteArgumentCheck.NotNull(clos, "clos");
@@ -280,7 +282,7 @@ namespace Apache.Ignite.Core.Impl.Compute
foreach (var clo in clos)
jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric()));
- return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false);
+ return ExecuteClosures0(new ComputeReducingClosureTask<object, TJobRes, TReduceRes>(rdc), null, jobs, false);
}
/// <summary>
@@ -288,11 +290,11 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="clo">Job to broadcast to all projection nodes.</param>
/// <returns>Collection of results for this execution.</returns>
- public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo)
+ public IFuture<ICollection<TJobRes>> Broadcast<TJobRes>(IComputeFunc<TJobRes> clo)
{
IgniteArgumentCheck.NotNull(clo, "clo");
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TJobRes, ICollection<TJobRes>>(1),
new ComputeOutFuncJob(clo.ToNonGeneric()), null, true);
}
@@ -303,11 +305,11 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="clo">Job to broadcast to all projection nodes.</param>
/// <param name="arg">Job closure argument.</param>
/// <returns>Collection of results for this execution.</returns>
- public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public IFuture<ICollection<TJobRes>> Broadcast<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
IgniteArgumentCheck.NotNull(clo, "clo");
- return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1),
+ return ExecuteClosures0(new ComputeMultiClosureTask<object, TJobRes, ICollection<TJobRes>>(1),
new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true);
}
@@ -367,11 +369,11 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="clo">Job to run.</param>
/// <param name="arg">Job argument.</param>
/// <returns>Job result for this execution.</returns>
- public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+ public IFuture<TJobRes> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo, TArg arg)
{
IgniteArgumentCheck.NotNull(clo, "clo");
- return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(),
+ return ExecuteClosures0(new ComputeSingleClosureTask<TArg, TJobRes, TJobRes>(),
new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false);
}
@@ -383,7 +385,8 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="clo">Job to run.</param>
/// <param name="args">Job arguments.</param>
/// <returns>Collection of job results.</returns>
- public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+ public IFuture<ICollection<TJobRes>> Apply<TArg, TJobRes>(IComputeFunc<TArg, TJobRes> clo,
+ IEnumerable<TArg> args)
{
IgniteArgumentCheck.NotNull(clo, "clo");
@@ -393,10 +396,10 @@ namespace Apache.Ignite.Core.Impl.Compute
var func = clo.ToNonGeneric();
- foreach (T arg in args)
+ foreach (TArg arg in args)
jobs.Add(new ComputeFuncJob(func, arg));
- return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count),
+ return ExecuteClosures0(new ComputeMultiClosureTask<TArg, TJobRes, ICollection<TJobRes>>(jobs.Count),
null, jobs, false);
}
@@ -410,8 +413,8 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="args">Job arguments.</param>
/// <param name="rdc">Reducer to reduce all job results into one individual return value.</param>
/// <returns>Reduced job result for this execution.</returns>
- public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args,
- IComputeReducer<TR1, TR2> rdc)
+ public IFuture<TReduceRes> Apply<TArg, TJobRes, TReduceRes>(IComputeFunc<TArg, TJobRes> clo,
+ IEnumerable<TArg> args, IComputeReducer<TJobRes, TReduceRes> rdc)
{
IgniteArgumentCheck.NotNull(clo, "clo");
@@ -423,10 +426,10 @@ namespace Apache.Ignite.Core.Impl.Compute
var func = clo.ToNonGeneric();
- foreach (T arg in args)
+ foreach (TArg arg in args)
jobs.Add(new ComputeFuncJob(func, arg));
- return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc),
+ return ExecuteClosures0(new ComputeReducingClosureTask<TArg, TJobRes, TReduceRes>(rdc),
null, jobs, false);
}
@@ -454,12 +457,12 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="affinityKey">Affinity key.</param>
/// <param name="clo">Job to execute.</param>
/// <returns>Job result for this execution.</returns>
- /// <typeparam name="TR">Type of job result.</typeparam>
- public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+ /// <typeparam name="TJobRes">Type of job result.</typeparam>
+ public IFuture<TJobRes> AffinityCall<TJobRes>(string cacheName, object affinityKey, IComputeFunc<TJobRes> clo)
{
IgniteArgumentCheck.NotNull(clo, "clo");
- return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(),
+ return ExecuteClosures0(new ComputeSingleClosureTask<object, TJobRes, TJobRes>(),
new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity,
writeAction: w => WriteAffinity(w, cacheName, affinityKey));
}
@@ -480,7 +483,8 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="jobs">Jobs.</param>
/// <param name="broadcast">Broadcast flag.</param>
/// <returns>Future.</returns>
- private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job,
+ private IFuture<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>(
+ IComputeTask<TArg, TJobRes, TReduceRes> task, IComputeJob job,
ICollection<IComputeJob> jobs, bool broadcast)
{
return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast,
@@ -499,13 +503,14 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <returns>Future.</returns>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "User code can throw any exception")]
- private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null,
+ private IFuture<TReduceRes> ExecuteClosures0<TArg, TJobRes, TReduceRes>(
+ IComputeTask<TArg, TJobRes, TReduceRes> task, IComputeJob job = null,
IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0,
Action<PortableWriterImpl> writeAction = null)
{
Debug.Assert(job != null || jobs != null);
- var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA));
+ var holder = new ComputeTaskHolder<TArg, TJobRes, TReduceRes>((Ignite) _prj.Ignite, this, task, default(TArg));
var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder);
[3/6] ignite git commit: IGNITE-1662: Renamed IMessageFilter to
IMessageListener.
Posted by vo...@apache.org.
IGNITE-1662: Renamed IMessageFilter to IMessageListener.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a77dd3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a77dd3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a77dd3a
Branch: refs/heads/ignite-1655
Commit: 2a77dd3a7f01208d7172b66f6520cfc0e6615570
Parents: 81feb95
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Thu Oct 15 12:20:58 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 12:20:58 2015 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.Tests/EventsTest.cs | 2 +-
.../IgniteStartStopTest.cs | 4 +-
.../Apache.Ignite.Core.Tests/MessagingTest.cs | 16 +-
.../Apache.Ignite.Core.csproj | 4 +-
.../dotnet/Apache.Ignite.Core/Events/IEvents.cs | 2 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 16 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 2 +-
.../Impl/Messaging/MessageFilterHolder.cs | 177 -------------------
.../Impl/Messaging/MessageListenerHolder.cs | 177 +++++++++++++++++++
.../Impl/Messaging/Messaging.cs | 22 +--
.../Impl/Portable/PortableMarshaller.cs | 2 +-
.../Impl/Portable/PortableUtils.cs | 2 +-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 4 +-
.../Messaging/IMessageFilter.cs | 35 ----
.../Messaging/IMessageListener.cs | 38 ++++
.../Apache.Ignite.Core/Messaging/IMessaging.cs | 15 +-
.../Events/LocalListener.cs | 2 +-
.../Messaging/LocalListener.cs | 2 +-
.../Messaging/RemoteOrderedListener.cs | 2 +-
.../Messaging/RemoteUnorderedListener.cs | 2 +-
20 files changed, 266 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
index c271aa6..b325d36 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTest.cs
@@ -383,7 +383,7 @@ namespace Apache.Ignite.Core.Tests
var expectedType = EventType.JobStarted;
var remoteFilter = portable
- ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
+ ? (IEventFilter<IEvent>) new RemoteEventPortableFilter(expectedType)
: new RemoteEventFilter(expectedType);
var localListener = EventsTestHelper.GetListener();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index bd776ce..d16063f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -384,7 +384,7 @@ namespace Apache.Ignite.Core.Tests
// to test race conditions during processor init on remote node
var listenTask = Task.Factory.StartNew(() =>
{
- var filter = new MessageFilter();
+ var filter = new MessageListener();
while (!token.IsCancellationRequested)
{
@@ -410,7 +410,7 @@ namespace Apache.Ignite.Core.Tests
/// Noop message filter.
/// </summary>
[Serializable]
- private class MessageFilter : IMessageFilter<int>
+ private class MessageListener : IMessageListener<int>
{
/** <inheritdoc /> */
public bool Invoke(Guid nodeId, int message)
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
index 95e48d3..55f2e6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
@@ -155,7 +155,7 @@ namespace Apache.Ignite.Core.Tests
{
var grid3GotMessage = false;
- var grid3Listener = new MessageFilter<string>((id, x) =>
+ var grid3Listener = new MessageListener<string>((id, x) =>
{
grid3GotMessage = true;
return true;
@@ -199,7 +199,7 @@ namespace Apache.Ignite.Core.Tests
var sharedReceived = 0;
- var sharedListener = new MessageFilter<string>((id, x) =>
+ var sharedListener = new MessageListener<string>((id, x) =>
{
Interlocked.Increment(ref sharedReceived);
Thread.MemoryBarrier();
@@ -220,7 +220,7 @@ namespace Apache.Ignite.Core.Tests
var localReceived = 0;
var stopLocal = 0;
- var localListener = new MessageFilter<string>((id, x) =>
+ var localListener = new MessageListener<string>((id, x) =>
{
Interlocked.Increment(ref localReceived);
Thread.MemoryBarrier();
@@ -569,9 +569,9 @@ namespace Apache.Ignite.Core.Tests
/// Gets the message listener.
/// </summary>
/// <returns>New instance of message listener.</returns>
- public static IMessageFilter<string> GetListener()
+ public static IMessageListener<string> GetListener()
{
- return new MessageFilter<string>(Listen);
+ return new MessageListener<string>(Listen);
}
/// <summary>
@@ -616,7 +616,7 @@ namespace Apache.Ignite.Core.Tests
/// Test message filter.
/// </summary>
[Serializable]
- public class MessageFilter<T> : IMessageFilter<T>
+ public class MessageListener<T> : IMessageListener<T>
{
/** */
private readonly Func<Guid, T, bool> _invoke;
@@ -628,10 +628,10 @@ namespace Apache.Ignite.Core.Tests
#pragma warning restore 649
/// <summary>
- /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class.
+ /// Initializes a new instance of the <see cref="MessageListener{T}"/> class.
/// </summary>
/// <param name="invoke">The invoke delegate.</param>
- public MessageFilter(Func<Guid, T, bool> invoke)
+ public MessageListener(Func<Guid, T, bool> invoke)
{
_invoke = invoke;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 848ce49..a10a0a5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -238,7 +238,7 @@
<Compile Include="Impl\Memory\PlatformPooledMemory.cs" />
<Compile Include="Impl\Memory\PlatformRawMemory.cs" />
<Compile Include="Impl\Memory\PlatformUnpooledMemory.cs" />
- <Compile Include="Impl\Messaging\MessageFilterHolder.cs" />
+ <Compile Include="Impl\Messaging\MessageListenerHolder.cs" />
<Compile Include="Impl\Messaging\Messaging.cs" />
<Compile Include="Impl\Messaging\MessagingAsync.cs" />
<Compile Include="Impl\NativeMethods.cs" />
@@ -307,7 +307,7 @@
<Compile Include="Impl\Unmanaged\UnmanagedUtils.cs" />
<Compile Include="Lifecycle\ILifecycleBean.cs" />
<Compile Include="Lifecycle\LifecycleEventType.cs" />
- <Compile Include="Messaging\IMessageFilter.cs" />
+ <Compile Include="Messaging\IMessageListener.cs" />
<Compile Include="Messaging\IMessaging.cs" />
<Compile Include="Portable\IPortableBuilder.cs" />
<Compile Include="Portable\IPortableIdMapper.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
index be38104..b2f07d4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/IEvents.cs
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Core.Events
/// </returns>
[AsyncSupported]
Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
where T : IEvent;
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 8b97884..0f2b3c1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -58,7 +58,7 @@ namespace Apache.Ignite.Core.Impl.Common
_cacheEntryProcessor;
/** */
- private readonly Func<object, Guid, object, bool> _messageFilter;
+ private readonly Func<object, Guid, object, bool> _messageLsnr;
/** */
private readonly Func<object, object> _computeJobExecute;
@@ -136,13 +136,13 @@ namespace Apache.Ignite.Core.Impl.Common
}
/// <summary>
- /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+ /// Gets the <see cref="IMessageListener{T}" /> invocator.
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
- public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+ public static Func<object, Guid, object, bool> GetMessageListener(Type type)
{
- return Get(type)._messageFilter;
+ return Get(type)._messageLsnr;
}
/// <summary>
@@ -286,18 +286,18 @@ namespace Apache.Ignite.Core.Impl.Common
_streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
new[] {iface});
}
- else if (genericTypeDefinition == typeof (IMessageFilter<>))
+ else if (genericTypeDefinition == typeof (IMessageListener<>))
{
- ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+ ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IMessageListener<>));
var arg = iface.GetGenericArguments()[0];
- _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ _messageLsnr = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
new[] { typeof(Guid), arg }, new[] { false, true, false });
}
else if (genericTypeDefinition == typeof (IComputeJob<>))
{
- ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+ ThrowIfMultipleInterfaces(_messageLsnr, type, typeof(IComputeJob<>));
_computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0],
methodName: "Execute");
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index f4cc341..08936e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -164,7 +164,7 @@ namespace Apache.Ignite.Core.Impl.Events
/** <inheritDoc /> */
public Guid? RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
- IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, IEnumerable<int> types = null)
where T : IEvent
{
return RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, TypesToArray(types));
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
deleted file mode 100644
index 8666e9b..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageFilterHolder.cs
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Messaging
-{
- using System;
- using System.Diagnostics;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Handle;
- using Apache.Ignite.Core.Impl.Portable;
- using Apache.Ignite.Core.Impl.Portable.IO;
- using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Messaging;
- using Apache.Ignite.Core.Portable;
-
- /// <summary>
- /// Non-generic portable filter wrapper.
- /// </summary>
- internal class MessageFilterHolder : IPortableWriteAware, IHandle
- {
- /** Invoker function that takes key and value and invokes wrapped IMessageFilter */
- private readonly Func<Guid, object, bool> _invoker;
-
- /** Current Ignite instance. */
- private readonly Ignite _ignite;
-
- /** Underlying filter. */
- private readonly object _filter;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="MessageFilterHolder" /> class.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="filter">The <see cref="IMessageFilter{T}" /> to wrap.</param>
- /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageFilter.</param>
- private MessageFilterHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
- {
- Debug.Assert(filter != null);
- Debug.Assert(invoker != null);
-
- _invoker = invoker;
-
- _filter = filter;
-
- // 1. Set fields.
- Debug.Assert(grid != null);
-
- _ignite = grid;
- _invoker = invoker;
-
- // 2. Perform injections.
- ResourceProcessor.Inject(filter, grid);
- }
-
- /// <summary>
- /// Invoke the filter.
- /// </summary>
- /// <param name="input">Input.</param>
- /// <returns></returns>
- public int Invoke(IPortableStream input)
- {
- var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
-
- var nodeId = rawReader.ReadGuid();
-
- Debug.Assert(nodeId != null);
-
- return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
- }
-
- /// <summary>
- /// Wrapped <see cref="IMessageFilter{T}" />.
- /// </summary>
- public object Filter
- {
- get { return _filter; }
- }
-
- /// <summary>
- /// Destroy callback.
- /// </summary>
- public Action DestroyAction { private get; set; }
-
- /** <inheritDoc /> */
- public void Release()
- {
- if (DestroyAction != null)
- DestroyAction();
- }
-
- /** <inheritDoc /> */
- public bool Released
- {
- get { return false; } // Multiple releases are allowed.
- }
-
- /// <summary>
- /// Creates local holder instance.
- /// </summary>
- /// <param name="grid">Ignite instance.</param>
- /// <param name="filter">Filter.</param>
- /// <returns>
- /// New instance of <see cref="MessageFilterHolder" />
- /// </returns>
- public static MessageFilterHolder CreateLocal<T>(Ignite grid, IMessageFilter<T> filter)
- {
- Debug.Assert(filter != null);
-
- return new MessageFilterHolder(grid, filter, (id, msg) => filter.Invoke(id, (T)msg));
- }
-
- /// <summary>
- /// Creates remote holder instance.
- /// </summary>
- /// <param name="grid">Grid.</param>
- /// <param name="memPtr">Memory pointer.</param>
- /// <returns>Deserialized instance of <see cref="MessageFilterHolder"/></returns>
- public static MessageFilterHolder CreateRemote(Ignite grid, long memPtr)
- {
- Debug.Assert(grid != null);
-
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return grid.Marshaller.Unmarshal<MessageFilterHolder>(stream);
- }
- }
-
- /// <summary>
- /// Gets the invoker func.
- /// </summary>
- private static Func<Guid, object, bool> GetInvoker(object pred)
- {
- var func = DelegateTypeDescriptor.GetMessageFilter(pred.GetType());
-
- return (id, msg) => func(pred, id, msg);
- }
-
- /** <inheritdoc /> */
- public void WritePortable(IPortableWriter writer)
- {
- var writer0 = (PortableWriterImpl)writer.GetRawWriter();
-
- writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="MessageFilterHolder"/> class.
- /// </summary>
- /// <param name="reader">The reader.</param>
- public MessageFilterHolder(IPortableReader reader)
- {
- var reader0 = (PortableReaderImpl)reader.GetRawReader();
-
- _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
-
- _invoker = GetInvoker(_filter);
-
- _ignite = reader0.Marshaller.Ignite;
-
- ResourceProcessor.Inject(_filter, _ignite);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
new file mode 100644
index 0000000..412a84e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/MessageListenerHolder.cs
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Messaging
+{
+ using System;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Handle;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Messaging;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Non-generic portable message listener wrapper.
+ /// </summary>
+ internal class MessageListenerHolder : IPortableWriteAware, IHandle
+ {
+ /** Invoker function that takes key and value and invokes wrapped IMessageListener */
+ private readonly Func<Guid, object, bool> _invoker;
+
+ /** Current Ignite instance. */
+ private readonly Ignite _ignite;
+
+ /** Underlying filter. */
+ private readonly object _filter;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="MessageListenerHolder" /> class.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="filter">The <see cref="IMessageListener{T}" /> to wrap.</param>
+ /// <param name="invoker">The invoker func that takes key and value and invokes wrapped IMessageListener.</param>
+ private MessageListenerHolder(Ignite grid, object filter, Func<Guid, object, bool> invoker)
+ {
+ Debug.Assert(filter != null);
+ Debug.Assert(invoker != null);
+
+ _invoker = invoker;
+
+ _filter = filter;
+
+ // 1. Set fields.
+ Debug.Assert(grid != null);
+
+ _ignite = grid;
+ _invoker = invoker;
+
+ // 2. Perform injections.
+ ResourceProcessor.Inject(filter, grid);
+ }
+
+ /// <summary>
+ /// Invoke the filter.
+ /// </summary>
+ /// <param name="input">Input.</param>
+ /// <returns></returns>
+ public int Invoke(IPortableStream input)
+ {
+ var rawReader = _ignite.Marshaller.StartUnmarshal(input).GetRawReader();
+
+ var nodeId = rawReader.ReadGuid();
+
+ Debug.Assert(nodeId != null);
+
+ return _invoker(nodeId.Value, rawReader.ReadObject<object>()) ? 1 : 0;
+ }
+
+ /// <summary>
+ /// Wrapped <see cref="IMessageListener{T}" />.
+ /// </summary>
+ public object Filter
+ {
+ get { return _filter; }
+ }
+
+ /// <summary>
+ /// Destroy callback.
+ /// </summary>
+ public Action DestroyAction { private get; set; }
+
+ /** <inheritDoc /> */
+ public void Release()
+ {
+ if (DestroyAction != null)
+ DestroyAction();
+ }
+
+ /** <inheritDoc /> */
+ public bool Released
+ {
+ get { return false; } // Multiple releases are allowed.
+ }
+
+ /// <summary>
+ /// Creates local holder instance.
+ /// </summary>
+ /// <param name="grid">Ignite instance.</param>
+ /// <param name="listener">Filter.</param>
+ /// <returns>
+ /// New instance of <see cref="MessageListenerHolder" />
+ /// </returns>
+ public static MessageListenerHolder CreateLocal<T>(Ignite grid, IMessageListener<T> listener)
+ {
+ Debug.Assert(listener != null);
+
+ return new MessageListenerHolder(grid, listener, (id, msg) => listener.Invoke(id, (T)msg));
+ }
+
+ /// <summary>
+ /// Creates remote holder instance.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="memPtr">Memory pointer.</param>
+ /// <returns>Deserialized instance of <see cref="MessageListenerHolder"/></returns>
+ public static MessageListenerHolder CreateRemote(Ignite grid, long memPtr)
+ {
+ Debug.Assert(grid != null);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return grid.Marshaller.Unmarshal<MessageListenerHolder>(stream);
+ }
+ }
+
+ /// <summary>
+ /// Gets the invoker func.
+ /// </summary>
+ private static Func<Guid, object, bool> GetInvoker(object pred)
+ {
+ var func = DelegateTypeDescriptor.GetMessageListener(pred.GetType());
+
+ return (id, msg) => func(pred, id, msg);
+ }
+
+ /** <inheritdoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var writer0 = (PortableWriterImpl)writer.GetRawWriter();
+
+ writer0.WithDetach(w => PortableUtils.WritePortableOrSerializable(w, Filter));
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="MessageListenerHolder"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public MessageListenerHolder(IPortableReader reader)
+ {
+ var reader0 = (PortableReaderImpl)reader.GetRawReader();
+
+ _filter = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+ _invoker = GetInvoker(_filter);
+
+ _ignite = reader0.Marshaller.Ignite;
+
+ ResourceProcessor.Inject(_filter, _ignite);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 8170a91..4ccbc3e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -113,17 +113,17 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public void LocalListen<T>(IMessageFilter<T> filter, object topic = null)
+ public void LocalListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
- ResourceProcessor.Inject(filter, _ignite);
+ ResourceProcessor.Inject(listener, _ignite);
lock (_funcMap)
{
- var key = GetKey(filter, topic);
+ var key = GetKey(listener, topic);
- MessageFilterHolder filter0 = MessageFilterHolder.CreateLocal(_ignite, filter);
+ MessageListenerHolder filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
var filterHnd = _ignite.HandleRegistry.Allocate(filter0);
@@ -155,16 +155,16 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null)
+ public void StopLocalListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
long filterHnd;
bool removed;
lock (_funcMap)
{
- removed = _funcMap.TryRemove(GetKey(filter, topic), out filterHnd);
+ removed = _funcMap.TryRemove(GetKey(listener, topic), out filterHnd);
}
if (removed)
@@ -178,11 +178,11 @@ namespace Apache.Ignite.Core.Impl.Messaging
}
/** <inheritdoc /> */
- public Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null)
+ public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(filter, "filter");
+ IgniteArgumentCheck.NotNull(listener, "filter");
- var filter0 = MessageFilterHolder.CreateLocal(_ignite, filter);
+ var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
try
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
index 67d8f2b..6499946 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
@@ -516,7 +516,7 @@ namespace Apache.Ignite.Core.Impl.Portable
AddSystemType(PortableUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w));
AddSystemType(PortableUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w));
AddSystemType(PortableUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w));
- AddSystemType(PortableUtils.TypeMessageFilterHolder, w => new MessageFilterHolder(w));
+ AddSystemType(PortableUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w));
AddSystemType(PortableUtils.TypePortableOrSerializableHolder, w => new PortableOrSerializableObjectHolder(w));
AddSystemType(PortableUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
index f80a199..c7be496 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
@@ -200,7 +200,7 @@ namespace Apache.Ignite.Core.Impl.Portable
public const byte TypeProductLicense = 78;
/** Type: message filter holder. */
- public const byte TypeMessageFilterHolder = 92;
+ public const byte TypeMessageListenerHolder = 92;
/** Type: message filter holder. */
public const byte TypePortableOrSerializableHolder = 93;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index f9949f3..3295904 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -788,7 +788,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
return SafeCall(() =>
{
- MessageFilterHolder holder = MessageFilterHolder.CreateRemote(_ignite, memPtr);
+ MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr);
return _ignite.HandleRegistry.AllocateSafe(holder);
});
@@ -798,7 +798,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
return SafeCall(() =>
{
- var holder = _ignite.HandleRegistry.Get<MessageFilterHolder>(ptr, false);
+ var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
if (holder == null)
return 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
deleted file mode 100644
index 456c5e6..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageFilter.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Messaging
-{
- using System;
-
- /// <summary>
- /// Represents messaging filter predicate.
- /// </summary>
- public interface IMessageFilter<in T>
- {
- /// <summary>
- /// Returns a value indicating whether provided message and node id satisfy this predicate.
- /// </summary>
- /// <param name="nodeId">Node identifier.</param>
- /// <param name="message">Message.</param>
- /// <returns>Value indicating whether provided message and node id satisfy this predicate.</returns>
- bool Invoke(Guid nodeId, T message);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
new file mode 100644
index 0000000..393a670
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessageListener.cs
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Messaging
+{
+ using System;
+
+ /// <summary>
+ /// Represents messaging filter predicate.
+ /// </summary>
+ public interface IMessageListener<in T>
+ {
+ /// <summary>
+ /// Invokes the message listener when a message arrives.
+ /// </summary>
+ /// <param name="nodeId">Message source node identifier.</param>
+ /// <param name="message">Message.</param>
+ /// <returns>
+ /// Value indicating whether this instance should remain subscribed.
+ /// Returning <c>false</c> will unsubscribe this message listener from further notifications.
+ /// </returns>
+ bool Invoke(Guid nodeId, T message);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
index 96f46b9..f846745 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Messaging/IMessaging.cs
@@ -67,19 +67,19 @@ namespace Apache.Ignite.Core.Messaging
/// node within the cluster group will send a message for a given topic to this node. Local listen
/// subscription will happen regardless of whether local node belongs to this cluster group or not.
/// </summary>
- /// <param name="filter">
+ /// <param name="listener">
/// Predicate that is called on each received message. If predicate returns false,
/// then it will be unsubscribed from any further notifications.
/// </param>
/// <param name="topic">Topic to subscribe to.</param>
- void LocalListen<T>(IMessageFilter<T> filter, object topic = null);
+ void LocalListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Unregisters local listener for given topic on local node only.
/// </summary>
- /// <param name="filter">Listener predicate.</param>
+ /// <param name="listener">Listener predicate.</param>
/// <param name="topic">Topic to unsubscribe from.</param>
- void StopLocalListen<T>(IMessageFilter<T> filter, object topic = null);
+ void StopLocalListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Adds a message listener for a given topic to all nodes in the cluster group (possibly including
@@ -87,13 +87,16 @@ namespace Apache.Ignite.Core.Messaging
/// group can send a message for a given topic and all nodes within the cluster group will receive
/// listener notifications.
/// </summary>
- /// <param name="filter">Listener predicate.</param>
+ /// <param name="listener">
+ /// Predicate that is called on each received message. If predicate returns false,
+ /// then it will be unsubscribed from any further notifications.
+ /// </param>
/// <param name="topic">Topic to unsubscribe from.</param>
/// <returns>
/// Operation ID that can be passed to <see cref="StopRemoteListen"/> method to stop listening.
/// </returns>
[AsyncSupported]
- Guid RemoteListen<T>(IMessageFilter<T> filter, object topic = null);
+ Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null);
/// <summary>
/// Unregisters all listeners identified with provided operation ID on all nodes in the cluster group.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
index 5cdb20c..067bd2a 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Events/LocalListener.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
index 7659bb4..591d426 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/LocalListener.cs
@@ -24,7 +24,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// <summary>
/// Local message listener which signals countdown event on each received message.
/// </summary>
- public class LocalListener : IMessageFilter<int>
+ public class LocalListener : IMessageListener<int>
{
/** Countdown event. */
private readonly CountdownEvent _countdown;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
index 8ae5ac1..85538c2 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteOrderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// Listener for Ordered topic.
/// </summary>
[Serializable]
- public class RemoteOrderedListener : IMessageFilter<int>
+ public class RemoteOrderedListener : IMessageListener<int>
{
/** Injected Ignite instance. */
[InstanceResource]
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a77dd3a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
index 166dbd6..ab23e8b 100644
--- a/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
+++ b/modules/platforms/dotnet/examples/Apache.Ignite.ExamplesDll/Messaging/RemoteUnorderedListener.cs
@@ -26,7 +26,7 @@ namespace Apache.Ignite.ExamplesDll.Messaging
/// Listener for Unordered topic.
/// </summary>
[Serializable]
- public class RemoteUnorderedListener : IMessageFilter<int>
+ public class RemoteUnorderedListener : IMessageListener<int>
{
/** Injected Ignite instance. */
[InstanceResource]
[4/6] ignite git commit: IGNITE-1610: Implemented portable reader and
writer for iterators.
Posted by vo...@apache.org.
IGNITE-1610: Implemented portable reader and writer for iterators.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6695e6c3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6695e6c3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6695e6c3
Branch: refs/heads/ignite-1655
Commit: 6695e6c3e46473c8b17d5a8fddd720ec7c6d91df
Parents: 2a77dd3
Author: isapego <is...@gridgain.com>
Authored: Thu Oct 15 13:38:31 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 13:38:31 2015 +0300
----------------------------------------------------------------------
.../src/portable_reader_writer_raw_test.cpp | 63 ++++++++++-
.../src/portable_reader_writer_test.cpp | 65 ++++++++++++
.../interop/interop_stream_position_guard.h | 79 ++++++++++++++
.../ignite/impl/portable/portable_reader_impl.h | 104 +++++++++++++++++++
.../ignite/impl/portable/portable_writer_impl.h | 55 ++++++++++
.../ignite/portable/portable_raw_reader.h | 26 +++++
.../ignite/portable/portable_raw_writer.h | 30 +++++-
.../include/ignite/portable/portable_reader.h | 29 ++++++
.../include/ignite/portable/portable_writer.h | 27 +++++
.../platforms/cpp/core/project/vs/core.vcxproj | 1 +
.../cpp/core/project/vs/core.vcxproj.filters | 3 +
.../src/impl/portable/portable_reader_impl.cpp | 75 +++++++++++++
.../core/src/portable/portable_raw_reader.cpp | 10 ++
.../cpp/core/src/portable/portable_reader.cpp | 10 ++
14 files changed, 574 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core-test/src/portable_reader_writer_raw_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/portable_reader_writer_raw_test.cpp b/modules/platforms/cpp/core-test/src/portable_reader_writer_raw_test.cpp
index c3a98aa..e93796f 100644
--- a/modules/platforms/cpp/core-test/src/portable_reader_writer_raw_test.cpp
+++ b/modules/platforms/cpp/core-test/src/portable_reader_writer_raw_test.cpp
@@ -495,6 +495,55 @@ void CheckRawCollection(CollectionType* colType)
BOOST_REQUIRE(rawReader.ReadInt8() == 1);
}
+void CheckRawCollectionIterators(CollectionType* colType)
+{
+ typedef std::vector<PortableInner> PortableInnerVector;
+
+ PortableInnerVector writeValues;
+ writeValues.push_back(1);
+ writeValues.push_back(0);
+ writeValues.push_back(2);
+
+ InteropUnpooledMemory mem(1024);
+
+ InteropOutputStream out(&mem);
+ PortableWriterImpl writer(&out, NULL);
+ PortableRawWriter rawWriter(&writer);
+
+ if (colType)
+ rawWriter.WriteCollection(writeValues.begin(), writeValues.end(), *colType);
+ else
+ rawWriter.WriteCollection(writeValues.begin(), writeValues.end());
+
+ rawWriter.WriteInt8(1);
+
+ out.Synchronize();
+
+ InteropInputStream in(&mem);
+ PortableReaderImpl reader(&in);
+ PortableRawReader rawReader(&reader);
+
+ int32_t collectionSize = rawReader.ReadCollectionSize();
+ BOOST_REQUIRE(collectionSize == writeValues.size());
+
+ if (colType)
+ BOOST_REQUIRE(rawReader.ReadCollectionType() == *colType);
+ else
+ BOOST_REQUIRE(rawReader.ReadCollectionType() == IGNITE_COLLECTION_UNDEFINED);
+
+ PortableInnerVector readValues(collectionSize);
+
+ int32_t elementsRead = rawReader.ReadCollection<PortableInner>(readValues.begin());
+
+ BOOST_REQUIRE(elementsRead == 3);
+
+ BOOST_REQUIRE(readValues[0].GetValue() == writeValues[0].GetValue());
+ BOOST_REQUIRE(readValues[1].GetValue() == writeValues[1].GetValue());
+ BOOST_REQUIRE(readValues[2].GetValue() == writeValues[2].GetValue());
+
+ BOOST_REQUIRE(rawReader.ReadInt8() == 1);
+}
+
void CheckRawMapEmpty(MapType* mapType)
{
InteropUnpooledMemory mem(1024);
@@ -1457,13 +1506,25 @@ BOOST_AUTO_TEST_CASE(TestCollection)
CheckRawCollection(NULL);
}
-BOOST_AUTO_TEST_CASE(testCollectionTyped)
+BOOST_AUTO_TEST_CASE(TestCollectionTyped)
{
CollectionType typ = IGNITE_COLLECTION_CONCURRENT_SKIP_LIST_SET;
CheckRawCollection(&typ);
}
+BOOST_AUTO_TEST_CASE(TestCollectionIterators)
+{
+ CheckRawCollectionIterators(NULL);
+}
+
+BOOST_AUTO_TEST_CASE(TestCollectionIteratorsTyped)
+{
+ CollectionType typ = IGNITE_COLLECTION_CONCURRENT_SKIP_LIST_SET;
+
+ CheckRawCollectionIterators(&typ);
+}
+
BOOST_AUTO_TEST_CASE(TestMapNull)
{
InteropUnpooledMemory mem(1024);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core-test/src/portable_reader_writer_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/portable_reader_writer_test.cpp b/modules/platforms/cpp/core-test/src/portable_reader_writer_test.cpp
index aff929b..0825ecc 100644
--- a/modules/platforms/cpp/core-test/src/portable_reader_writer_test.cpp
+++ b/modules/platforms/cpp/core-test/src/portable_reader_writer_test.cpp
@@ -572,6 +572,59 @@ void CheckCollection(CollectionType* colType)
BOOST_REQUIRE(reader.ReadInt8("field2") == 1);
}
+void CheckCollectionIterators(CollectionType* colType)
+{
+ typedef std::vector<PortableInner> PortableInnerVector;
+ PortableInnerVector writeValues;
+
+ writeValues.push_back(1);
+ writeValues.push_back(0);
+ writeValues.push_back(2);
+
+ TemplatedPortableIdResolver<PortableDummy> idRslvr;
+
+ InteropUnpooledMemory mem(1024);
+
+ InteropOutputStream out(&mem);
+ PortableWriterImpl writerImpl(&out, &idRslvr, NULL, NULL);
+ PortableWriter writer(&writerImpl);
+
+ out.Position(18);
+
+ if (colType)
+ writer.WriteCollection("field1", writeValues.begin(), writeValues.end(), *colType);
+ else
+ writer.WriteCollection("field1", writeValues.begin(), writeValues.end());
+
+ writer.WriteInt8("field2", 1);
+
+ out.Synchronize();
+
+ InteropInputStream in(&mem);
+ PortableReaderImpl readerImpl(&in, &idRslvr, 0, true, idRslvr.GetTypeId(), 0, 1000, 1000);
+ PortableReader reader(&readerImpl);
+
+ in.Position(18);
+
+ BOOST_REQUIRE(reader.ReadCollectionSize("field1") == writeValues.size());
+
+ CollectionType expectedCollectionType = colType ? *colType : IGNITE_COLLECTION_UNDEFINED;
+ BOOST_REQUIRE(reader.ReadCollectionType("field1") == expectedCollectionType);
+
+ PortableInnerVector readValues;
+ std::back_insert_iterator<PortableInnerVector> readInsertIterator(readValues);
+
+ reader.ReadCollection<PortableInner>("field1", readInsertIterator);
+
+ BOOST_REQUIRE(readValues.size() == 3);
+
+ BOOST_REQUIRE(readValues[0].GetValue() == writeValues[0].GetValue());
+ BOOST_REQUIRE(readValues[1].GetValue() == writeValues[1].GetValue());
+ BOOST_REQUIRE(readValues[2].GetValue() == writeValues[2].GetValue());
+
+ BOOST_REQUIRE(reader.ReadInt8("field2") == 1);
+}
+
void CheckMapEmpty(MapType* mapType)
{
TemplatedPortableIdResolver<PortableDummy> idRslvr;
@@ -1698,6 +1751,18 @@ BOOST_AUTO_TEST_CASE(testCollectionTyped)
CheckCollection(&typ);
}
+BOOST_AUTO_TEST_CASE(TestCollectionIterators)
+{
+ CheckCollectionIterators(NULL);
+}
+
+BOOST_AUTO_TEST_CASE(TestCollectionIteratorsTyped)
+{
+ CollectionType typ = IGNITE_COLLECTION_CONCURRENT_SKIP_LIST_SET;
+
+ CheckCollectionIterators(&typ);
+}
+
BOOST_AUTO_TEST_CASE(TestMapNull)
{
TemplatedPortableIdResolver<PortableDummy> idRslvr;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/impl/interop/interop_stream_position_guard.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_stream_position_guard.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_stream_position_guard.h
new file mode 100644
index 0000000..17ecf53
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_stream_position_guard.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_INTEROP_STREAM_POSITION_GUARD
+#define _IGNITE_IMPL_INTEROP_OUTPUT_POSITION_GUARD
+
+#include "ignite/impl/interop/interop_memory.h"
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace interop
+ {
+ /**
+ * Interop stream position guard.
+ */
+ template<typename T>
+ class IGNITE_IMPORT_EXPORT InteropStreamPositionGuard {
+ public:
+ /**
+ * Create new position guard and saves current stream position.
+ *
+ * @param stream Stream which position should be saved.
+ */
+ InteropStreamPositionGuard(T& stream) : stream(&stream), pos(stream.Position())
+ {
+ //No-op
+ }
+
+ /**
+ * Destructor.
+ *
+ * Restores stream's position to a saved one on destruction.
+ */
+ ~InteropStreamPositionGuard()
+ {
+ if (stream)
+ stream->Position(pos);
+ }
+
+ /**
+ * Releases guard so it will not restore streams position on destruction.
+ *
+ * @param val Value.
+ */
+ void Release()
+ {
+ stream = NULL;
+ }
+
+ private:
+ /** Stream. */
+ T* stream;
+
+ /** Saved position. */
+ int32_t pos;
+
+ IGNITE_NO_COPY_ASSIGNMENT(InteropStreamPositionGuard)
+ };
+ }
+ }
+}
+
+#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/impl/portable/portable_reader_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/portable/portable_reader_impl.h b/modules/platforms/cpp/core/include/ignite/impl/portable/portable_reader_impl.h
index ab93d10..5050a04 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/portable/portable_reader_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/portable/portable_reader_impl.h
@@ -518,6 +518,66 @@ namespace ignite
int32_t ReadCollection(const char* fieldName, ignite::portable::CollectionType* typ, int32_t* size);
/**
+ * Read values and insert them to specified position.
+ *
+ * @param out Output iterator to the initial position in the destination sequence.
+ * @return Actual amount of elements read.
+ */
+ template<typename T, typename OutputIterator>
+ int32_t ReadCollection(OutputIterator out)
+ {
+ int32_t size;
+ int32_t id = StartContainerSession(true, IGNITE_TYPE_COLLECTION, &size);
+
+ // Reading collection type. We don't need it here but it should be read.
+ if (size != -1)
+ stream->ReadInt8();
+
+ while (HasNextElement(id))
+ {
+ *out = ReadElement<T>(id);
+ ++out;
+ }
+
+ return size;
+ }
+
+ /**
+ * Read values and insert them to specified position.
+ *
+ * @param fieldName Field name.
+ * @param out Output iterator to the initial position in the destination sequence.
+ * @return Actual amount of elements read.
+ */
+ template<typename T, typename OutputIterator>
+ int32_t ReadCollection(const char* fieldName, OutputIterator out)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen <= 0)
+ return -1;
+
+ int32_t size;
+ int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, &size);
+
+ // Reading collection type. We don't need it here but it should be read.
+ if (size != -1)
+ stream->ReadInt8();
+
+ while (HasNextElement(id))
+ {
+ *out = ReadElement<T>(id);
+ ++out;
+ }
+
+ return size;
+ }
+
+ /**
* Start map read.
*
* @param typ Map type.
@@ -537,6 +597,36 @@ namespace ignite
int32_t ReadMap(const char* fieldName, ignite::portable::MapType* typ, int32_t* size);
/**
+ * Read type of the collection.
+ *
+ * @return Collection type.
+ */
+ ignite::portable::CollectionType ReadCollectionType();
+
+ /**
+ * Read type of the collection.
+ *
+ * @param fieldName Field name.
+ * @return Collection type.
+ */
+ ignite::portable::CollectionType ReadCollectionType(const char* fieldName);
+
+ /**
+ * Read size of the collection.
+ *
+ * @return Collection size.
+ */
+ int32_t ReadCollectionSize();
+
+ /**
+ * Read size of the collection.
+ *
+ * @param fieldName Field name.
+ * @return Collection size.
+ */
+ int32_t ReadCollectionSize(const char* fieldName);
+
+ /**
* Check whether next value exists.
*
* @param id Session ID.
@@ -1014,6 +1104,20 @@ namespace ignite
int32_t ReadStringInternal(char* res, const int32_t len);
/**
+ * Read type of the collection. Do not preserve stream position.
+ *
+ * @return Collection type.
+ */
+ ignite::portable::CollectionType ReadCollectionTypeUnprotected();
+
+ /**
+ * Read size of the collection. Do not preserve stream position.
+ *
+ * @return Collection size.
+ */
+ int32_t ReadCollectionSizeUnprotected();
+
+ /**
* Read value.
*
* @param expHdr Expected header.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/impl/portable/portable_writer_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/portable/portable_writer_impl.h b/modules/platforms/cpp/core/include/ignite/impl/portable/portable_writer_impl.h
index 2e5a0e7..0259a7e 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/portable/portable_writer_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/portable/portable_writer_impl.h
@@ -440,6 +440,40 @@ namespace ignite
* @return Session ID.
*/
int32_t WriteCollection(const char* fieldName, ignite::portable::CollectionType typ);
+
+ /**
+ * Write values in interval [first, last).
+ *
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollection(InputIterator first, InputIterator last, ignite::portable::CollectionType typ)
+ {
+ StartContainerSession(true);
+
+ WriteCollectionWithinSession(first, last, typ);
+ }
+
+ /**
+ * Write values in interval [first, last).
+ *
+ * @param fieldName Field name.
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollection(const char* fieldName, InputIterator first, InputIterator last,
+ ignite::portable::CollectionType typ)
+ {
+ StartContainerSession(false);
+
+ WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_COLLECTION);
+
+ WriteCollectionWithinSession(first, last, typ);
+ }
/**
* Start map write.
@@ -747,6 +781,27 @@ namespace ignite
}
/**
+ * Write values in interval [first, last).
+ * New session should be started prior to call to this method.
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollectionWithinSession(InputIterator first, InputIterator last,
+ ignite::portable::CollectionType typ)
+ {
+ stream->WriteInt8(IGNITE_TYPE_COLLECTION);
+ stream->Position(stream->Position() + 4);
+ stream->WriteInt8(typ);
+
+ for (InputIterator i = first; i != last; ++i)
+ WriteElement(elemId, *i);
+
+ CommitContainer(elemId);
+ }
+
+ /**
* Check raw mode.
*
* @param expected Expected raw mode of the reader.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/portable/portable_raw_reader.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/portable/portable_raw_reader.h b/modules/platforms/cpp/core/include/ignite/portable/portable_raw_reader.h
index 0ecaa4d..40abe8b 100644
--- a/modules/platforms/cpp/core/include/ignite/portable/portable_raw_reader.h
+++ b/modules/platforms/cpp/core/include/ignite/portable/portable_raw_reader.h
@@ -289,6 +289,18 @@ namespace ignite
}
/**
+ * Read values and insert them to specified position.
+ *
+ * @param out Output iterator to the initial position in the destination sequence.
+ * @return Number of elements that have been read.
+ */
+ template<typename T, typename OutputIterator>
+ int32_t ReadCollection(OutputIterator out)
+ {
+ return impl->ReadCollection<T>(out);
+ }
+
+ /**
* Start map read.
*
* @return Map reader.
@@ -305,6 +317,20 @@ namespace ignite
}
/**
+ * Read type of the collection.
+ *
+ * @return Collection type.
+ */
+ CollectionType ReadCollectionType();
+
+ /**
+ * Read type of the collection.
+ *
+ * @return Collection size.
+ */
+ int32_t ReadCollectionSize();
+
+ /**
* Read object.
*
* @return Object.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/portable/portable_raw_writer.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/portable/portable_raw_writer.h b/modules/platforms/cpp/core/include/ignite/portable/portable_raw_writer.h
index 4cf2f00..7d0c118 100644
--- a/modules/platforms/cpp/core/include/ignite/portable/portable_raw_writer.h
+++ b/modules/platforms/cpp/core/include/ignite/portable/portable_raw_writer.h
@@ -247,7 +247,7 @@ namespace ignite
* @return Collection writer.
*/
template<typename T>
- PortableCollectionWriter<T> WriteCollection(ignite::portable::CollectionType typ)
+ PortableCollectionWriter<T> WriteCollection(CollectionType typ)
{
int32_t id = impl->WriteCollection(typ);
@@ -255,6 +255,32 @@ namespace ignite
}
/**
+ * Write values in interval [first, last).
+ *
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollection(InputIterator first, InputIterator last)
+ {
+ impl->WriteCollection(first, last, IGNITE_COLLECTION_UNDEFINED);
+ }
+
+ /**
+ * Write values in interval [first, last).
+ *
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollection(InputIterator first, InputIterator last, CollectionType typ)
+ {
+ impl->WriteCollection(first, last, typ);
+ }
+
+ /**
* Start map write.
*
* @param typ Map type.
@@ -273,7 +299,7 @@ namespace ignite
* @return Map writer.
*/
template<typename K, typename V>
- PortableMapWriter<K, V> WriteMap(ignite::portable::MapType typ)
+ PortableMapWriter<K, V> WriteMap(MapType typ)
{
int32_t id = impl->WriteMap(typ);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/portable/portable_reader.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/portable/portable_reader.h b/modules/platforms/cpp/core/include/ignite/portable/portable_reader.h
index 8a04f0f..d0533fd 100644
--- a/modules/platforms/cpp/core/include/ignite/portable/portable_reader.h
+++ b/modules/platforms/cpp/core/include/ignite/portable/portable_reader.h
@@ -311,6 +311,19 @@ namespace ignite
}
/**
+ * Read values and insert them to specified position.
+ *
+ * @param fieldName Field name.
+ * @param out Output iterator to the initial position in the destination sequence.
+ * @return Number of elements that have been read.
+ */
+ template<typename T, typename OutputIterator>
+ int32_t ReadCollection(const char* fieldName, OutputIterator out)
+ {
+ return impl->ReadCollection<T>(fieldName, out);
+ }
+
+ /**
* Start map read.
*
* @param fieldName Field name.
@@ -328,6 +341,22 @@ namespace ignite
}
/**
+ * Read type of the collection.
+ *
+ * @param fieldName Field name.
+ * @return Collection type.
+ */
+ CollectionType ReadCollectionType(const char* fieldName);
+
+ /**
+ * Read type of the collection.
+ *
+ * @param fieldName Field name.
+ * @return Collection size.
+ */
+ int32_t ReadCollectionSize(const char* fieldName);
+
+ /**
* Read object.
*
* @param fieldName Field name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/include/ignite/portable/portable_writer.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/portable/portable_writer.h b/modules/platforms/cpp/core/include/ignite/portable/portable_writer.h
index d5009ac..c225340 100644
--- a/modules/platforms/cpp/core/include/ignite/portable/portable_writer.h
+++ b/modules/platforms/cpp/core/include/ignite/portable/portable_writer.h
@@ -280,6 +280,33 @@ namespace ignite
}
/**
+ * Write values in interval [first, last).
+ *
+ * @param fieldName Field name.
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ */
+ template<typename InputIterator>
+ void WriteCollection(const char* fieldName, InputIterator first, InputIterator last)
+ {
+ WriteCollection(fieldName, first, last, IGNITE_COLLECTION_UNDEFINED);
+ }
+
+ /**
+ * Write values in interval [first, last).
+ *
+ * @param fieldName Field name.
+ * @param first Iterator pointing to the beginning of the interval.
+ * @param last Iterator pointing to the end of the interval.
+ * @param typ Collection type.
+ */
+ template<typename InputIterator>
+ void WriteCollection(const char* fieldName, InputIterator first, InputIterator last, CollectionType typ)
+ {
+ impl->WriteCollection(fieldName, first, last, typ);
+ }
+
+ /**
* Start map write.
*
* @param fieldName Field name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 58fa283..b7e4f7c 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -211,6 +211,7 @@
<ClInclude Include="..\..\include\ignite\impl\interop\interop_input_stream.h" />
<ClInclude Include="..\..\include\ignite\impl\interop\interop_memory.h" />
<ClInclude Include="..\..\include\ignite\impl\interop\interop_output_stream.h" />
+ <ClInclude Include="..\..\include\ignite\impl\interop\interop_stream_position_guard.h" />
<ClInclude Include="..\..\include\ignite\impl\operations.h" />
<ClInclude Include="..\..\include\ignite\impl\portable\portable_common.h" />
<ClInclude Include="..\..\include\ignite\impl\portable\portable_id_resolver.h" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index d18599d..83f2fc7 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -213,6 +213,9 @@
<ClInclude Include="..\..\include\ignite\cache\query\query_scan.h">
<Filter>Code\cache\query</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\impl\interop\interop_stream_position_guard.h">
+ <Filter>Code\impl\interop</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Code">
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
index e41dafc..a8196a1 100644
--- a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
@@ -22,6 +22,7 @@
#include "ignite/impl/portable/portable_utils.h"
#include "ignite/portable/portable_type.h"
#include "ignite/ignite_error.h"
+#include "ignite/impl/interop/interop_stream_position_guard.h"
using namespace ignite::impl::interop;
using namespace ignite::impl::portable;
@@ -477,6 +478,80 @@ namespace ignite
}
}
+ CollectionType PortableReaderImpl::ReadCollectionTypeUnprotected()
+ {
+ int32_t size = ReadCollectionSizeUnprotected();
+ if (size == -1)
+ return IGNITE_COLLECTION_UNDEFINED;
+
+ CollectionType typ = static_cast<CollectionType>(stream->ReadInt8());
+
+ return typ;
+ }
+
+ CollectionType PortableReaderImpl::ReadCollectionType()
+ {
+ InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
+
+ return ReadCollectionTypeUnprotected();
+ }
+
+ CollectionType PortableReaderImpl::ReadCollectionType(const char* fieldName)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen <= 0)
+ return IGNITE_COLLECTION_UNDEFINED;
+
+ return ReadCollectionTypeUnprotected();
+ }
+
+ int32_t PortableReaderImpl::ReadCollectionSizeUnprotected()
+ {
+ int8_t hdr = stream->ReadInt8();
+
+ if (hdr != IGNITE_TYPE_COLLECTION)
+ {
+ if (hdr != IGNITE_HDR_NULL)
+ ThrowOnInvalidHeader(IGNITE_TYPE_COLLECTION, hdr);
+
+ return -1;
+ }
+
+ int32_t size = stream->ReadInt32();
+
+ return size;
+ }
+
+ int32_t PortableReaderImpl::ReadCollectionSize()
+ {
+ InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
+
+ return ReadCollectionSizeUnprotected();
+ }
+
+ int32_t PortableReaderImpl::ReadCollectionSize(const char* fieldName)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen <= 0)
+ return -1;
+
+ return ReadCollectionSizeUnprotected();
+ }
+
bool PortableReaderImpl::HasNextElement(int32_t id) const
{
return elemId == id && elemRead < elemCnt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/src/portable/portable_raw_reader.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/portable/portable_raw_reader.cpp b/modules/platforms/cpp/core/src/portable/portable_raw_reader.cpp
index f659913..775c561 100644
--- a/modules/platforms/cpp/core/src/portable/portable_raw_reader.cpp
+++ b/modules/platforms/cpp/core/src/portable/portable_raw_reader.cpp
@@ -131,5 +131,15 @@ namespace ignite
return PortableStringArrayReader(impl, id, size);
}
+
+ CollectionType PortableRawReader::ReadCollectionType()
+ {
+ return impl->ReadCollectionType();
+ }
+
+ int32_t PortableRawReader::ReadCollectionSize()
+ {
+ return impl->ReadCollectionSize();
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6695e6c3/modules/platforms/cpp/core/src/portable/portable_reader.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/portable/portable_reader.cpp b/modules/platforms/cpp/core/src/portable/portable_reader.cpp
index fe8fba1..62c1e67 100644
--- a/modules/platforms/cpp/core/src/portable/portable_reader.cpp
+++ b/modules/platforms/cpp/core/src/portable/portable_reader.cpp
@@ -132,6 +132,16 @@ namespace ignite
return PortableStringArrayReader(impl, id, size);
}
+ CollectionType PortableReader::ReadCollectionType(const char* fieldName)
+ {
+ return impl->ReadCollectionType(fieldName);
+ }
+
+ int32_t PortableReader::ReadCollectionSize(const char* fieldName)
+ {
+ return impl->ReadCollectionSize(fieldName);
+ }
+
PortableRawReader PortableReader::RawReader()
{
impl->SetRawMode();
[6/6] ignite git commit: Merge branch 'ignite-1282' into ignite-1655
Posted by vo...@apache.org.
Merge branch 'ignite-1282' into ignite-1655
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1288066
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1288066
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1288066
Branch: refs/heads/ignite-1655
Commit: f12880664799ed4e163c92ed10bf899965d00f7f
Parents: c1952ac 91eeab7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Oct 15 15:53:27 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Oct 15 15:53:27 2015 +0300
----------------------------------------------------------------------
.../src/portable_reader_writer_raw_test.cpp | 63 +++-
.../src/portable_reader_writer_test.cpp | 65 ++++
.../interop/interop_stream_position_guard.h | 79 +++++
.../ignite/impl/portable/portable_reader_impl.h | 104 ++++++
.../ignite/impl/portable/portable_writer_impl.h | 55 +++
.../ignite/portable/portable_raw_reader.h | 26 ++
.../ignite/portable/portable_raw_writer.h | 30 +-
.../include/ignite/portable/portable_reader.h | 29 ++
.../include/ignite/portable/portable_writer.h | 27 ++
.../platforms/cpp/core/project/vs/core.vcxproj | 1 +
.../cpp/core/project/vs/core.vcxproj.filters | 3 +
.../src/impl/portable/portable_reader_impl.cpp | 75 ++++
.../core/src/portable/portable_raw_reader.cpp | 10 +
.../cpp/core/src/portable/portable_reader.cpp | 10 +
.../Cache/CacheTestAsyncWrapper.cs | 10 +-
.../Apache.Ignite.Core.Tests/EventsTest.cs | 110 +++---
.../IgniteStartStopTest.cs | 4 +-
.../Apache.Ignite.Core.Tests/MessagingTest.cs | 16 +-
.../Apache.Ignite.Core.csproj | 5 +-
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 14 +-
.../Cache/ICacheEntryProcessor.cs | 8 +-
.../Compute/ComputeTaskAdapter.cs | 16 +-
.../Compute/ComputeTaskSplitAdapter.cs | 8 +-
.../Apache.Ignite.Core/Compute/ICompute.cs | 82 ++---
.../Apache.Ignite.Core/Compute/IComputeFunc.cs | 8 +-
.../Apache.Ignite.Core/Compute/IComputeJob.cs | 4 +-
.../Compute/IComputeJobResult.cs | 6 +-
.../Compute/IComputeReducer.cs | 8 +-
.../Apache.Ignite.Core/Compute/IComputeTask.cs | 24 +-
.../Datastream/StreamTransformer.cs | 12 +-
.../Apache.Ignite.Core/Events/EventType.cs | 344 +++++++++----------
.../Apache.Ignite.Core/Events/IEventFilter.cs | 9 +-
.../Apache.Ignite.Core/Events/IEventListener.cs | 34 ++
.../dotnet/Apache.Ignite.Core/Events/IEvents.cs | 97 +-----
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 18 +-
.../Impl/Cache/CacheProxyImpl.cs | 6 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 24 +-
.../Apache.Ignite.Core/Impl/Compute/Compute.cs | 40 ++-
.../Impl/Compute/ComputeAsync.cs | 61 ++--
.../Impl/Compute/ComputeImpl.cs | 79 +++--
.../Apache.Ignite.Core/Impl/Events/Events.cs | 39 ++-
.../Impl/Events/RemoteListenEventFilter.cs | 10 +-
.../Impl/Messaging/MessageFilterHolder.cs | 177 ----------
.../Impl/Messaging/MessageListenerHolder.cs | 177 ++++++++++
.../Impl/Messaging/Messaging.cs | 22 +-
.../Impl/Portable/PortableMarshaller.cs | 2 +-
.../Impl/Portable/PortableUtils.cs | 2 +-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 4 +-
.../Messaging/IMessageFilter.cs | 35 --
.../Messaging/IMessageListener.cs | 38 ++
.../Apache.Ignite.Core/Messaging/IMessaging.cs | 15 +-
.../Events/EventsExample.cs | 20 +-
.../Apache.Ignite.ExamplesDll.csproj | 1 -
.../Events/LocalListener.cs | 7 +-
.../Events/RemoteFilter.cs | 42 ---
.../Messaging/LocalListener.cs | 2 +-
.../Messaging/RemoteOrderedListener.cs | 2 +-
.../Messaging/RemoteUnorderedListener.cs | 2 +-
58 files changed, 1353 insertions(+), 868 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f1288066/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUtils.cs
----------------------------------------------------------------------