You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/09 08:38:26 UTC
[12/50] [abbrv] ignite git commit: IGNITE-4074 Refactor async
(*future) operations in PlatformTarget
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index 5d1add640..bc1b4bb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -53,10 +53,11 @@ namespace Apache.Ignite.Core.Impl.Events
EnableLocal = 8,
DisableLocal = 9,
GetEnabledEvents = 10,
- WithAsync = 11,
IsEnabled = 12,
LocalListen = 13,
- StopLocalListen = 14
+ StopLocalListen = 14,
+ RemoteQueryAsync = 15,
+ WaitForLocalAsync = 16
}
/** Map from user func to local wrapper, needed for invoke/unsubscribe. */
@@ -66,9 +67,6 @@ namespace Apache.Ignite.Core.Impl.Events
/** Cluster group. */
private readonly IClusterGroup _clusterGroup;
- /** Async instance. */
- private readonly Lazy<Events> _asyncInstance;
-
/// <summary>
/// Initializes a new instance of the <see cref="Events" /> class.
/// </summary>
@@ -81,17 +79,6 @@ namespace Apache.Ignite.Core.Impl.Events
Debug.Assert(clusterGroup != null);
_clusterGroup = clusterGroup;
-
- _asyncInstance = new Lazy<Events>(() => new Events(this));
- }
-
- /// <summary>
- /// Initializes a new async instance.
- /// </summary>
- /// <param name="events">The events.</param>
- private Events(Events events) : base(UU.TargetOutObject(events.Target, (int) Op.WithAsync), events.Marshaller)
- {
- _clusterGroup = events.ClusterGroup;
}
/** <inheritDoc /> */
@@ -106,14 +93,6 @@ namespace Apache.Ignite.Core.Impl.Events
get { return (Ignite) ClusterGroup.Ignite; }
}
- /// <summary>
- /// Gets the asynchronous instance.
- /// </summary>
- private Events AsyncInstance
- {
- get { return _asyncInstance.Value; }
- }
-
/** <inheritDoc /> */
public ICollection<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
where T : IEvent
@@ -121,14 +100,7 @@ namespace Apache.Ignite.Core.Impl.Events
IgniteArgumentCheck.NotNull(filter, "filter");
return DoOutInOp((int) Op.RemoteQuery,
- writer =>
- {
- writer.Write(filter);
-
- writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds));
-
- WriteEventTypes(types, writer);
- },
+ writer => WriteRemoteQuery(filter, timeout, types, writer),
reader => ReadEvents<T>(reader));
}
@@ -136,11 +108,11 @@ namespace Apache.Ignite.Core.Impl.Events
public Task<ICollection<T>> RemoteQueryAsync<T>(IEventFilter<T> filter, TimeSpan? timeout = null,
params int[] types) where T : IEvent
{
- AsyncInstance.RemoteQuery(filter, timeout, types);
+ IgniteArgumentCheck.NotNull(filter, "filter");
// ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010)
- return GetFuture<ICollection<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target,
- futId, futTyp, (int) Op.RemoteQuery), convertFunc: ReadEvents<T>).Task;
+ return DoOutOpAsync<ICollection<T>>((int) Op.RemoteQueryAsync,
+ w => WriteRemoteQuery(filter, timeout, types, w), convertFunc: ReadEvents<T>);
}
/** <inheritDoc /> */
@@ -234,46 +206,53 @@ namespace Apache.Ignite.Core.Impl.Events
/** <inheritDoc /> */
public T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
{
- long hnd = 0;
+ var hnd = GetFilterHandle(filter);
try
{
- return WaitForLocal0(filter, ref hnd, types);
+ return DoOutInOp((int) Op.WaitForLocal,
+ writer =>
+ {
+ writer.WriteObject(hnd);
+ WriteEventTypes(types, writer);
+ },
+ reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
}
finally
{
- if (filter != null)
- Ignite.HandleRegistry.Release(hnd);
+ if (hnd != null)
+ Ignite.HandleRegistry.Release(hnd.Value);
}
}
/** <inheritDoc /> */
public Task<T> WaitForLocalAsync<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
{
- long hnd = 0;
+ var hnd = GetFilterHandle(filter);
try
{
- AsyncInstance.WaitForLocal0(filter, ref hnd, types);
-
- // ReSharper disable once RedundantTypeArgumentsOfMethod (won't compile in VS2010)
- var fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFutureForOperation(AsyncInstance.Target, futId,
- futTyp, (int) Op.WaitForLocal), convertFunc: reader => (T) EventReader.Read<IEvent>(reader));
+ var task = DoOutOpAsync((int) Op.WaitForLocalAsync, writer =>
+ {
+ writer.WriteObject(hnd);
+ WriteEventTypes(types, writer);
+ }, convertFunc: EventReader.Read<T>);
- if (filter != null)
+ if (hnd != null)
{
// Dispose handle as soon as future ends.
- fut.Task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd));
+ task.ContinueWith(x => Ignite.HandleRegistry.Release(hnd.Value));
}
- return fut.Task;
+ return task;
}
catch (Exception)
{
- Ignite.HandleRegistry.Release(hnd);
+ if (hnd != null)
+ Ignite.HandleRegistry.Release(hnd.Value);
+
throw;
}
-
}
/** <inheritDoc /> */
@@ -392,38 +371,13 @@ namespace Apache.Ignite.Core.Impl.Events
}
/// <summary>
- /// Waits for the specified events.
+ /// Gets the filter handle.
/// </summary>
- /// <typeparam name="T">Type of events.</typeparam>
- /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param>
- /// <param name="handle">The filter handle, if applicable.</param>
- /// <param name="types">Types of the events to wait for.
- /// If not provided, all events will be passed to the filter.</param>
- /// <returns>Ignite event.</returns>
- private T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent
- {
- if (filter != null)
- handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter
- {
- InvokeFunc = stream => InvokeLocalFilter(stream, filter)
- });
-
- var hnd = handle;
-
- return DoOutInOp((int)Op.WaitForLocal,
- writer =>
- {
- if (filter != null)
- {
- writer.WriteBoolean(true);
- writer.WriteLong(hnd);
- }
- else
- writer.WriteBoolean(false);
-
- WriteEventTypes(types, writer);
- },
- reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
+ private long? GetFilterHandle<T>(IEventFilter<T> filter) where T : IEvent
+ {
+ return filter != null
+ ? Ignite.HandleRegistry.Allocate(new LocalEventFilter<T>(Marshaller, filter))
+ : (long?) null;
}
/// <summary>
@@ -559,20 +513,6 @@ namespace Apache.Ignite.Core.Impl.Events
/// <param name="stream">The stream.</param>
/// <param name="listener">The listener.</param>
/// <returns>Filter invocation result.</returns>
- private bool InvokeLocalFilter<T>(IBinaryStream stream, IEventFilter<T> listener) where T : IEvent
- {
- var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
-
- return listener.Invoke(evt);
- }
-
- /// <summary>
- /// Invokes local filter using data from specified stream.
- /// </summary>
- /// <typeparam name="T">Event object type.</typeparam>
- /// <param name="stream">The stream.</param>
- /// <param name="listener">The listener.</param>
- /// <returns>Filter invocation result.</returns>
private bool InvokeLocalListener<T>(IBinaryStream stream, IEventListener<T> listener) where T : IEvent
{
var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
@@ -614,17 +554,49 @@ namespace Apache.Ignite.Core.Impl.Events
}
/// <summary>
+ /// Writes the remote query.
+ /// </summary>
+ /// <param name="filter">The filter.</param>
+ /// <param name="timeout">The timeout.</param>
+ /// <param name="types">The types.</param>
+ /// <param name="writer">The writer.</param>
+ private static void WriteRemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout, int[] types,
+ IBinaryRawWriter writer)
+ where T : IEvent
+ {
+ writer.WriteObject(filter);
+
+ writer.WriteLong((long)(timeout == null ? 0 : timeout.Value.TotalMilliseconds));
+
+ WriteEventTypes(types, writer);
+ }
+
+ /// <summary>
/// Local user filter wrapper.
/// </summary>
- private class LocalEventFilter : IInteropCallback
+ private class LocalEventFilter<T> : IInteropCallback where T : IEvent
{
/** */
- public Func<IBinaryStream, bool> InvokeFunc;
+ private readonly Marshaller _marshaller;
+
+ /** */
+ private readonly IEventFilter<T> _listener;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="LocalEventFilter{T}"/> class.
+ /// </summary>
+ public LocalEventFilter(Marshaller marshaller, IEventFilter<T> listener)
+ {
+ _marshaller = marshaller;
+ _listener = listener;
+ }
/** <inheritdoc /> */
public int Invoke(IBinaryStream stream)
{
- return InvokeFunc(stream) ? 1 : 0;
+ var evt = EventReader.Read<T>(_marshaller.StartUnmarshal(stream));
+
+ return _listener.Invoke(evt) ? 1 : 0;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index a8e3075..79df470 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -485,7 +485,7 @@ namespace Apache.Ignite.Core.Impl
/// </returns>
public ICache<TK, TV> Cache<TK, TV>(IUnmanagedTarget nativeCache, bool keepBinary = false)
{
- return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false, false);
+ return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false);
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 2216d1a..1b43438 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Messaging
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Collections;
@@ -30,7 +31,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
using Apache.Ignite.Core.Impl.Resource;
using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Messaging functionality.
@@ -49,7 +49,8 @@ namespace Apache.Ignite.Core.Impl.Messaging
SendOrdered = 5,
StopLocalListen = 6,
StopRemoteListen = 7,
- WithAsync = 8
+ RemoteListenAsync = 9,
+ StopRemoteListenAsync = 10
}
/** Map from user (func+topic) -> id, needed for unsubscription. */
@@ -59,12 +60,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
/** Grid */
private readonly Ignite _ignite;
- /** Async instance. */
- private readonly Lazy<Messaging> _asyncInstance;
-
- /** Async flag. */
- private readonly bool _isAsync;
-
/** Cluster group. */
private readonly IClusterGroup _clusterGroup;
@@ -82,20 +77,6 @@ namespace Apache.Ignite.Core.Impl.Messaging
_clusterGroup = prj;
_ignite = (Ignite) prj.Ignite;
-
- _asyncInstance = new Lazy<Messaging>(() => new Messaging(this));
- }
-
- /// <summary>
- /// Initializes a new async instance.
- /// </summary>
- /// <param name="messaging">The messaging.</param>
- private Messaging(Messaging messaging) : base(
- UU.TargetOutObject(messaging.Target, (int) Op.WithAsync), messaging.Marshaller)
- {
- _isAsync = true;
- _ignite = messaging._ignite;
- _clusterGroup = messaging.ClusterGroup;
}
/** <inheritdoc /> */
@@ -104,16 +85,7 @@ namespace Apache.Ignite.Core.Impl.Messaging
get { return _clusterGroup; }
}
- /// <summary>
- /// Gets the asynchronous instance.
- /// </summary>
- private Messaging AsyncInstance
- {
- get { return _asyncInstance.Value; }
- }
-
/** <inheritdoc /> */
-
public void Send(object message, object topic = null)
{
IgniteArgumentCheck.NotNull(message, "message");
@@ -216,65 +188,28 @@ namespace Apache.Ignite.Core.Impl.Messaging
/** <inheritdoc /> */
public Guid RemoteListen<T>(IMessageListener<T> listener, object topic = null)
{
- IgniteArgumentCheck.NotNull(listener, "filter");
-
- var filter0 = MessageListenerHolder.CreateLocal(_ignite, listener);
- var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
-
- try
- {
- Guid id = Guid.Empty;
-
- DoOutInOp((int) Op.RemoteListen,
- writer =>
- {
- writer.Write(filter0);
- writer.WriteLong(filterHnd);
- writer.Write(topic);
- },
- input =>
- {
- var id0 = Marshaller.StartUnmarshal(input).GetRawReader().ReadGuid();
-
- Debug.Assert(_isAsync || id0.HasValue);
-
- if (id0.HasValue)
- id = id0.Value;
- });
-
- return id;
- }
- catch (Exception)
- {
- _ignite.HandleRegistry.Release(filterHnd);
-
- throw;
- }
+ return RemoteListen(listener, topic,
+ (writeAct, readAct) => DoOutInOp((int) Op.RemoteListen, writeAct,
+ stream => readAct(Marshaller.StartUnmarshal(stream))));
}
/** <inheritdoc /> */
public Task<Guid> RemoteListenAsync<T>(IMessageListener<T> listener, object topic = null)
{
- AsyncInstance.RemoteListen(listener, topic);
-
- return AsyncInstance.GetTask<Guid>();
+ return RemoteListen(listener, topic,
+ (writeAct, readAct) => DoOutOpAsync((int) Op.RemoteListenAsync, writeAct, convertFunc: readAct));
}
/** <inheritdoc /> */
public void StopRemoteListen(Guid opId)
{
- DoOutOp((int) Op.StopRemoteListen, writer =>
- {
- writer.WriteGuid(opId);
- });
+ DoOutOp((int) Op.StopRemoteListen, writer => writer.WriteGuid(opId));
}
/** <inheritdoc /> */
public Task StopRemoteListenAsync(Guid opId)
{
- AsyncInstance.StopRemoteListen(opId);
-
- return AsyncInstance.GetTask();
+ return DoOutOpAsync((int) Op.StopRemoteListenAsync, writer => writer.WriteGuid(opId));
}
/// <summary>
@@ -287,5 +222,33 @@ namespace Apache.Ignite.Core.Impl.Messaging
{
return new KeyValuePair<object, object>(filter, topic);
}
+
+ /// <summary>
+ /// Remotes listen.
+ /// </summary>
+ private TRes RemoteListen<T, TRes>(IMessageListener<T> filter, object topic,
+ Func<Action<IBinaryRawWriter>, Func<BinaryReader, Guid>, TRes> invoker)
+ {
+ IgniteArgumentCheck.NotNull(filter, "filter");
+
+ var filter0 = MessageListenerHolder.CreateLocal(_ignite, filter);
+ var filterHnd = _ignite.HandleRegistry.AllocateSafe(filter0);
+
+ try
+ {
+ return invoker(writer =>
+ {
+ writer.WriteObject(filter0);
+ writer.WriteLong(filterHnd);
+ writer.WriteObject(topic);
+ }, input => input.ReadGuid() ?? Guid.Empty);
+ }
+ catch (Exception)
+ {
+ _ignite.HandleRegistry.Release(filterHnd);
+
+ throw;
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f392830..d5b69a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Binary.Metadata;
@@ -757,6 +758,102 @@ namespace Apache.Ignite.Core.Impl
#endregion
+ #region Async operations
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <returns>Task for async operation</returns>
+ protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null)
+ {
+ return DoOutOpAsync<object>(type, writeAction);
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="T">Type of the result.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Task for async operation</returns>
+ protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ return GetFuture((futId, futType) => DoOutOp(type, w =>
+ {
+ if (writeAction != null)
+ writeAction(w);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ }), keepBinary, convertFunc).Task;
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="T">Type of the result.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <returns>Future for async operation</returns>
+ protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction)
+ {
+ return GetFuture<T>((futId, futType) => DoOutOpObject(type, w =>
+ {
+ writeAction(w);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ }));
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="TR">Type of the result.</typeparam>
+ /// <typeparam name="T1">The type of the first arg.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="val1">First arg.</param>
+ /// <returns>
+ /// Task for async operation
+ /// </returns>
+ protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1)
+ {
+ return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+ {
+ w.WriteObject(val1);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ })).Task;
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="TR">Type of the result.</typeparam>
+ /// <typeparam name="T1">The type of the first arg.</typeparam>
+ /// <typeparam name="T2">The type of the second arg.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="val1">First arg.</param>
+ /// <param name="val2">Second arg.</param>
+ /// <returns>
+ /// Task for async operation
+ /// </returns>
+ protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2)
+ {
+ return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+ {
+ w.WriteObject(val1);
+ w.WriteObject(val2);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ })).Task;
+ }
+
+ #endregion
+
#region Miscelanneous
/// <summary>
@@ -846,7 +943,7 @@ namespace Apache.Ignite.Core.Impl
/// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
/// <param name="convertFunc">The function to read future result from stream.</param>
/// <returns>Created future.</returns>
- protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
+ private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
Func<BinaryReader, T> convertFunc = null)
{
var futType = FutureType.Object;
@@ -862,7 +959,18 @@ namespace Apache.Ignite.Core.Impl
var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
- var futTarget = listenAction(futHnd, (int) futType);
+ IUnmanagedTarget futTarget;
+
+ try
+ {
+ futTarget = listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+ throw;
+ }
fut.SetTarget(futTarget);
@@ -893,25 +1001,18 @@ namespace Apache.Ignite.Core.Impl
var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
- listenAction(futHnd, (int)futType);
-
- return fut;
- }
+ try
+ {
+ listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
- /// <summary>
- /// Creates a task to listen for the last async op.
- /// </summary>
- protected Task GetTask()
- {
- return GetTask<object>();
- }
+ throw;
+ }
- /// <summary>
- /// Creates a task to listen for the last async op.
- /// </summary>
- protected Task<T> GetTask<T>()
- {
- return GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp)).Task;
+ return fut;
}
#endregion
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
index 8fc973b..9d9acfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
@@ -17,12 +17,12 @@
namespace Apache.Ignite.Core.Impl.Services
{
- using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
@@ -51,9 +51,6 @@ namespace Apache.Ignite.Core.Impl.Services
private const int OpDescriptors = 5;
/** */
- private const int OpWithAsync = 6;
-
- /** */
private const int OpWithServerKeepBinary = 7;
/** */
@@ -66,6 +63,18 @@ namespace Apache.Ignite.Core.Impl.Services
private const int OpCancelAll = 10;
/** */
+ private const int OpDeployAsync = 11;
+
+ /** */
+ private const int OpDeployMultipleAsync = 12;
+
+ /** */
+ private const int OpCancelAsync = 13;
+
+ /** */
+ private const int OpCancelAllAsync = 14;
+
+ /** */
private readonly IClusterGroup _clusterGroup;
/** Invoker binary flag. */
@@ -74,9 +83,6 @@ namespace Apache.Ignite.Core.Impl.Services
/** Server binary flag. */
private readonly bool _srvKeepBinary;
- /** Async instance. */
- private readonly Lazy<Services> _asyncInstance;
-
/// <summary>
/// Initializes a new instance of the <see cref="Services" /> class.
/// </summary>
@@ -94,20 +100,6 @@ namespace Apache.Ignite.Core.Impl.Services
_clusterGroup = clusterGroup;
_keepBinary = keepBinary;
_srvKeepBinary = srvKeepBinary;
-
- _asyncInstance = new Lazy<Services>(() => new Services(this));
- }
-
- /// <summary>
- /// Initializes a new async instance.
- /// </summary>
- /// <param name="services">The services.</param>
- private Services(Services services) : base(UU.TargetOutObject(services.Target, OpWithAsync),
- services.Marshaller)
- {
- _clusterGroup = services.ClusterGroup;
- _keepBinary = services._keepBinary;
- _srvKeepBinary = services._srvKeepBinary;
}
/** <inheritDoc /> */
@@ -134,14 +126,6 @@ namespace Apache.Ignite.Core.Impl.Services
get { return _clusterGroup; }
}
- /// <summary>
- /// Gets the asynchronous instance.
- /// </summary>
- private Services AsyncInstance
- {
- get { return _asyncInstance.Value; }
- }
-
/** <inheritDoc /> */
public void DeployClusterSingleton(string name, IService service)
{
@@ -154,9 +138,10 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task DeployClusterSingletonAsync(string name, IService service)
{
- AsyncInstance.DeployClusterSingleton(name, service);
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+ IgniteArgumentCheck.NotNull(service, "service");
- return AsyncInstance.GetTask();
+ return DeployMultipleAsync(name, service, 1, 1);
}
/** <inheritDoc /> */
@@ -171,9 +156,10 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task DeployNodeSingletonAsync(string name, IService service)
{
- AsyncInstance.DeployNodeSingleton(name, service);
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+ IgniteArgumentCheck.NotNull(service, "service");
- return AsyncInstance.GetTask();
+ return DeployMultipleAsync(name, service, 0, 1);
}
/** <inheritDoc /> */
@@ -197,9 +183,19 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task DeployKeyAffinitySingletonAsync<TK>(string name, IService service, string cacheName, TK affinityKey)
{
- AsyncInstance.DeployKeyAffinitySingleton(name, service, cacheName, affinityKey);
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+ IgniteArgumentCheck.NotNull(service, "service");
+ IgniteArgumentCheck.NotNull(affinityKey, "affinityKey");
- return AsyncInstance.GetTask();
+ return DeployAsync(new ServiceConfiguration
+ {
+ Name = name,
+ Service = service,
+ CacheName = cacheName,
+ AffinityKey = affinityKey,
+ TotalCount = 1,
+ MaxPerNodeCount = 1
+ });
}
/** <inheritDoc /> */
@@ -220,9 +216,16 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task DeployMultipleAsync(string name, IService service, int totalCount, int maxPerNodeCount)
{
- AsyncInstance.DeployMultiple(name, service, totalCount, maxPerNodeCount);
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+ IgniteArgumentCheck.NotNull(service, "service");
- return AsyncInstance.GetTask();
+ return DoOutOpAsync(OpDeployMultipleAsync, w =>
+ {
+ w.WriteString(name);
+ w.WriteObject(service);
+ w.WriteInt(totalCount);
+ w.WriteInt(maxPerNodeCount);
+ });
}
/** <inheritDoc /> */
@@ -230,28 +233,15 @@ namespace Apache.Ignite.Core.Impl.Services
{
IgniteArgumentCheck.NotNull(configuration, "configuration");
- DoOutOp(OpDeploy, w =>
- {
- w.WriteString(configuration.Name);
- w.WriteObject(configuration.Service);
- w.WriteInt(configuration.TotalCount);
- w.WriteInt(configuration.MaxPerNodeCount);
- w.WriteString(configuration.CacheName);
- w.WriteObject(configuration.AffinityKey);
-
- if (configuration.NodeFilter != null)
- w.WriteObject(configuration.NodeFilter);
- else
- w.WriteObject<object>(null);
- });
+ DoOutOp(OpDeploy, w => WriteServiceConfiguration(configuration, w));
}
/** <inheritDoc /> */
public Task DeployAsync(ServiceConfiguration configuration)
{
- AsyncInstance.Deploy(configuration);
+ IgniteArgumentCheck.NotNull(configuration, "configuration");
- return AsyncInstance.GetTask();
+ return DoOutOpAsync(OpDeployAsync, w => WriteServiceConfiguration(configuration, w));
}
/** <inheritDoc /> */
@@ -265,9 +255,9 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task CancelAsync(string name)
{
- AsyncInstance.Cancel(name);
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
- return AsyncInstance.GetTask();
+ return DoOutOpAsync(OpCancelAsync, w => w.WriteString(name));
}
/** <inheritDoc /> */
@@ -279,9 +269,7 @@ namespace Apache.Ignite.Core.Impl.Services
/** <inheritDoc /> */
public Task CancelAllAsync()
{
- AsyncInstance.CancelAll();
-
- return AsyncInstance.GetTask();
+ return DoOutOpAsync(OpCancelAllAsync);
}
/** <inheritDoc /> */
@@ -391,5 +379,26 @@ namespace Apache.Ignite.Core.Impl.Services
writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args, platform),
stream => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary), proxy.Target);
}
+
+ /// <summary>
+ /// Writes the service configuration.
+ /// </summary>
+ private static void WriteServiceConfiguration(ServiceConfiguration configuration, IBinaryRawWriter w)
+ {
+ Debug.Assert(configuration != null);
+ Debug.Assert(w != null);
+
+ w.WriteString(configuration.Name);
+ w.WriteObject(configuration.Service);
+ w.WriteInt(configuration.TotalCount);
+ w.WriteInt(configuration.MaxPerNodeCount);
+ w.WriteString(configuration.CacheName);
+ w.WriteObject(configuration.AffinityKey);
+
+ if (configuration.NodeFilter != null)
+ w.WriteObject(configuration.NodeFilter);
+ else
+ w.WriteObject<object>(null);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
index 796044d..7de9be1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Transactions
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Transactions;
@@ -214,11 +213,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
/// </summary>
internal Task CommitAsync(TransactionImpl tx)
{
- return GetFuture<object>((futId, futTyp) => DoOutOp(OpCommitAsync, (IBinaryStream s) =>
- {
- s.WriteLong(tx.Id);
- s.WriteLong(futId);
- })).Task;
+ return DoOutOpAsync(OpCommitAsync, w => w.WriteLong(tx.Id));
}
/// <summary>
@@ -226,11 +221,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
/// </summary>
internal Task RollbackAsync(TransactionImpl tx)
{
- return GetFuture<object>((futId, futTyp) => DoOutOp(OpRollbackAsync, (IBinaryStream s) =>
- {
- s.WriteLong(tx.Id);
- s.WriteLong(futId);
- })).Task;
+ return DoOutOpAsync(OpRollbackAsync, w => w.WriteLong(tx.Id));
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index c352f0c..6b867de 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -150,18 +150,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")]
public static extern void* TargetOutObject(void* ctx, void* target, int opType);
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFuture")]
- public static extern void TargetListenFut(void* ctx, void* target, long futId, int typ);
-
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")]
- public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId);
-
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")]
- public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ);
-
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")]
- public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId);
-
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")]
public static extern void* Acquire(void* ctx, void* target);
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index b1d4ecd..36dc332 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -460,31 +460,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
return target.ChangeTarget(res);
}
- internal static void TargetListenFuture(IUnmanagedTarget target, long futId, int typ)
- {
- JNI.TargetListenFut(target.Context, target.Target, futId, typ);
- }
-
- internal static void TargetListenFutureForOperation(IUnmanagedTarget target, long futId, int typ, int opId)
- {
- JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId);
- }
-
- internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ)
- {
- var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ);
-
- return target.ChangeTarget(res);
- }
-
- internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId,
- int typ, int opId)
- {
- var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId);
-
- return target.ChangeTarget(res);
- }
-
#endregion
#region NATIVE METHODS: MISCELANNEOUS
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index ac065bc..72ce015 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -1,5 +1,6 @@
\ufeff<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/CSharpLanguageProject/LanguageLevel/@EntryValue">CSharp50</s:String>
- <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean>
+ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
</wpf:ResourceDictionary>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
index cf9e287..9672abe 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.TeamCity.DotSettings
@@ -25,6 +25,7 @@
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticMemberInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=StaticFieldInGenericType/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VirtualMemberNeverOverriden_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
+ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/QualifiedUsingAtNestedScope/@EntryValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file