You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:41:59 UTC
[30/49] ignite git commit: IGNITE-4033 Streamline platform callback
interface
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index cc205e8..09933be 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
- using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using Apache.Ignite.Core.Cache.Affinity;
@@ -77,7 +76,14 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/** Keep references to created delegates. */
// ReSharper disable once CollectionNeverQueried.Local
- private readonly List<Delegate> _delegates = new List<Delegate>(50);
+ private readonly List<Delegate> _delegates = new List<Delegate>(5);
+
+ /** Handlers array. */
+ private readonly InLongOutLongHandler[] _inLongOutLongHandlers = new InLongOutLongHandler[62];
+
+ /** Handlers array. */
+ private readonly InLongLongLongObjectOutLongHandler[] _inLongLongLongObjectOutLongHandlers
+ = new InLongLongLongObjectOutLongHandler[62];
/** Initialized flag. */
private readonly ManualResetEventSlim _initEvent = new ManualResetEventSlim(false);
@@ -107,90 +113,19 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/** Operation: prepare .Net. */
private const int OpPrepareDotNet = 1;
- private delegate long CacheStoreCreateCallbackDelegate(void* target, long memPtr);
- private delegate int CacheStoreInvokeCallbackDelegate(void* target, long objPtr, long memPtr);
- private delegate void CacheStoreDestroyCallbackDelegate(void* target, long objPtr);
- private delegate long CacheStoreSessionCreateCallbackDelegate(void* target, long storePtr);
-
- private delegate long CacheEntryFilterCreateCallbackDelegate(void* target, long memPtr);
- private delegate int CacheEntryFilterApplyCallbackDelegate(void* target, long objPtr, long memPtr);
- private delegate void CacheEntryFilterDestroyCallbackDelegate(void* target, long objPtr);
-
- private delegate void CacheInvokeCallbackDelegate(void* target, long inMemPtr, long outMemPtr);
-
- private delegate void ComputeTaskMapCallbackDelegate(void* target, long taskPtr, long inMemPtr, long outMemPtr);
- private delegate int ComputeTaskJobResultCallbackDelegate(void* target, long taskPtr, long jobPtr, long memPtr);
- private delegate void ComputeTaskReduceCallbackDelegate(void* target, long taskPtr);
- private delegate void ComputeTaskCompleteCallbackDelegate(void* target, long taskPtr, long memPtr);
- private delegate int ComputeJobSerializeCallbackDelegate(void* target, long jobPtr, long memPtr);
- private delegate long ComputeJobCreateCallbackDelegate(void* target, long memPtr);
- private delegate void ComputeJobExecuteCallbackDelegate(void* target, long jobPtr, int cancel, long memPtr);
- private delegate void ComputeJobCancelCallbackDelegate(void* target, long jobPtr);
- private delegate void ComputeJobDestroyCallbackDelegate(void* target, long jobPtr);
-
- private delegate void ContinuousQueryListenerApplyCallbackDelegate(void* target, long lsnrPtr, long memPtr);
- private delegate long ContinuousQueryFilterCreateCallbackDelegate(void* target, long memPtr);
- private delegate int ContinuousQueryFilterApplyCallbackDelegate(void* target, long filterPtr, long memPtr);
- private delegate void ContinuousQueryFilterReleaseCallbackDelegate(void* target, long filterPtr);
-
- private delegate void DataStreamerTopologyUpdateCallbackDelegate(void* target, long ldrPtr, long topVer, int topSize);
- private delegate void DataStreamerStreamReceiverInvokeCallbackDelegate(void* target, long ptr, void* cache, long memPtr, byte keepPortable);
-
- private delegate void FutureByteResultCallbackDelegate(void* target, long futPtr, int res);
- private delegate void FutureBoolResultCallbackDelegate(void* target, long futPtr, int res);
- private delegate void FutureShortResultCallbackDelegate(void* target, long futPtr, int res);
- private delegate void FutureCharResultCallbackDelegate(void* target, long futPtr, int res);
- private delegate void FutureIntResultCallbackDelegate(void* target, long futPtr, int res);
- private delegate void FutureFloatResultCallbackDelegate(void* target, long futPtr, float res);
- private delegate void FutureLongResultCallbackDelegate(void* target, long futPtr, long res);
- private delegate void FutureDoubleResultCallbackDelegate(void* target, long futPtr, double res);
- private delegate void FutureObjectResultCallbackDelegate(void* target, long futPtr, long memPtr);
- private delegate void FutureNullResultCallbackDelegate(void* target, long futPtr);
- private delegate void FutureErrorCallbackDelegate(void* target, long futPtr, long memPtr);
-
- private delegate void LifecycleOnEventCallbackDelegate(void* target, long ptr, int evt);
-
- private delegate void MemoryReallocateCallbackDelegate(void* target, long memPtr, int cap);
-
- private delegate long MessagingFilterCreateCallbackDelegate(void* target, long memPtr);
- private delegate int MessagingFilterApplyCallbackDelegate(void* target, long ptr, long memPtr);
- private delegate void MessagingFilterDestroyCallbackDelegate(void* target, long ptr);
-
- private delegate long EventFilterCreateCallbackDelegate(void* target, long memPtr);
- private delegate int EventFilterApplyCallbackDelegate(void* target, long ptr, long memPtr);
- private delegate void EventFilterDestroyCallbackDelegate(void* target, long ptr);
-
- private delegate long ServiceInitCallbackDelegate(void* target, long memPtr);
- private delegate void ServiceExecuteCallbackDelegate(void* target, long svcPtr, long memPtr);
- private delegate void ServiceCancelCallbackDelegate(void* target, long svcPtr, long memPtr);
- private delegate void ServiceInvokeMethodCallbackDelegate(void* target, long svcPtr, long inMemPtr, long outMemPtr);
-
- private delegate int ClusterNodeFilterApplyCallbackDelegate(void* target, long memPtr);
-
- private delegate void NodeInfoCallbackDelegate(void* target, long memPtr);
-
- private delegate void OnStartCallbackDelegate(void* target, void* proc, long memPtr);
- private delegate void OnStopCallbackDelegate(void* target);
-
private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, sbyte* stackTraceChars, int stackTraceCharsLen, void* errData, int errDataLen);
- private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1);
- private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2);
-
- private delegate void OnClientDisconnectedDelegate(void* target);
- private delegate void OnClientReconnectedDelegate(void* target, bool clusterRestarted);
-
private delegate void LoggerLogDelegate(void* target, int level, sbyte* messageChars, int messageCharsLen, sbyte* categoryChars, int categoryCharsLen, sbyte* errorInfoChars, int errorInfoCharsLen, long memPtr);
private delegate bool LoggerIsLevelEnabledDelegate(void* target, int level);
- private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc);
- private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr);
- private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr);
- private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr);
- private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr);
-
private delegate void ConsoleWriteDelegate(sbyte* chars, int charsLen, bool isErr);
+ private delegate long InLongOutLongDelegate(void* target, int type, long val);
+ private delegate long InLongLongLongObjectOutLongDelegate(void* target, int type, long val1, long val2, long val3, void* arg);
+
+ private delegate long InLongOutLongFunc(long val);
+ private delegate long InLongLongLongObjectOutLongFunc(long val1, long val2, long val3, void* arg);
+
/// <summary>
/// Constructor.
/// </summary>
@@ -204,89 +139,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
target = IntPtr.Zero.ToPointer(), // Target is not used in .Net as we rely on dynamic FP creation.
- cacheStoreCreate = CreateFunctionPointer((CacheStoreCreateCallbackDelegate) CacheStoreCreate),
- cacheStoreInvoke = CreateFunctionPointer((CacheStoreInvokeCallbackDelegate) CacheStoreInvoke),
- cacheStoreDestroy = CreateFunctionPointer((CacheStoreDestroyCallbackDelegate) CacheStoreDestroy),
-
- cacheStoreSessionCreate = CreateFunctionPointer((CacheStoreSessionCreateCallbackDelegate) CacheStoreSessionCreate),
-
- cacheEntryFilterCreate = CreateFunctionPointer((CacheEntryFilterCreateCallbackDelegate)CacheEntryFilterCreate),
- cacheEntryFilterApply = CreateFunctionPointer((CacheEntryFilterApplyCallbackDelegate)CacheEntryFilterApply),
- cacheEntryFilterDestroy = CreateFunctionPointer((CacheEntryFilterDestroyCallbackDelegate)CacheEntryFilterDestroy),
-
- cacheInvoke = CreateFunctionPointer((CacheInvokeCallbackDelegate) CacheInvoke),
-
- computeTaskMap = CreateFunctionPointer((ComputeTaskMapCallbackDelegate) ComputeTaskMap),
- computeTaskJobResult =
- CreateFunctionPointer((ComputeTaskJobResultCallbackDelegate) ComputeTaskJobResult),
- computeTaskReduce = CreateFunctionPointer((ComputeTaskReduceCallbackDelegate) ComputeTaskReduce),
- computeTaskComplete = CreateFunctionPointer((ComputeTaskCompleteCallbackDelegate) ComputeTaskComplete),
- computeJobSerialize = CreateFunctionPointer((ComputeJobSerializeCallbackDelegate) ComputeJobSerialize),
- computeJobCreate = CreateFunctionPointer((ComputeJobCreateCallbackDelegate) ComputeJobCreate),
- computeJobExecute = CreateFunctionPointer((ComputeJobExecuteCallbackDelegate) ComputeJobExecute),
- computeJobCancel = CreateFunctionPointer((ComputeJobCancelCallbackDelegate) ComputeJobCancel),
- computeJobDestroy = CreateFunctionPointer((ComputeJobDestroyCallbackDelegate) ComputeJobDestroy),
- continuousQueryListenerApply =
- CreateFunctionPointer((ContinuousQueryListenerApplyCallbackDelegate) ContinuousQueryListenerApply),
- continuousQueryFilterCreate =
- CreateFunctionPointer((ContinuousQueryFilterCreateCallbackDelegate) ContinuousQueryFilterCreate),
- continuousQueryFilterApply =
- CreateFunctionPointer((ContinuousQueryFilterApplyCallbackDelegate) ContinuousQueryFilterApply),
- continuousQueryFilterRelease =
- CreateFunctionPointer((ContinuousQueryFilterReleaseCallbackDelegate) ContinuousQueryFilterRelease),
- dataStreamerTopologyUpdate =
- CreateFunctionPointer((DataStreamerTopologyUpdateCallbackDelegate) DataStreamerTopologyUpdate),
- dataStreamerStreamReceiverInvoke =
- CreateFunctionPointer((DataStreamerStreamReceiverInvokeCallbackDelegate) DataStreamerStreamReceiverInvoke),
-
- futureByteResult = CreateFunctionPointer((FutureByteResultCallbackDelegate) FutureByteResult),
- futureBoolResult = CreateFunctionPointer((FutureBoolResultCallbackDelegate) FutureBoolResult),
- futureShortResult = CreateFunctionPointer((FutureShortResultCallbackDelegate) FutureShortResult),
- futureCharResult = CreateFunctionPointer((FutureCharResultCallbackDelegate) FutureCharResult),
- futureIntResult = CreateFunctionPointer((FutureIntResultCallbackDelegate) FutureIntResult),
- futureFloatResult = CreateFunctionPointer((FutureFloatResultCallbackDelegate) FutureFloatResult),
- futureLongResult = CreateFunctionPointer((FutureLongResultCallbackDelegate) FutureLongResult),
- futureDoubleResult = CreateFunctionPointer((FutureDoubleResultCallbackDelegate) FutureDoubleResult),
- futureObjectResult = CreateFunctionPointer((FutureObjectResultCallbackDelegate) FutureObjectResult),
- futureNullResult = CreateFunctionPointer((FutureNullResultCallbackDelegate) FutureNullResult),
- futureError = CreateFunctionPointer((FutureErrorCallbackDelegate) FutureError),
- lifecycleOnEvent = CreateFunctionPointer((LifecycleOnEventCallbackDelegate) LifecycleOnEvent),
- memoryReallocate = CreateFunctionPointer((MemoryReallocateCallbackDelegate) MemoryReallocate),
- nodeInfo = CreateFunctionPointer((NodeInfoCallbackDelegate) NodeInfo),
-
- messagingFilterCreate = CreateFunctionPointer((MessagingFilterCreateCallbackDelegate)MessagingFilterCreate),
- messagingFilterApply = CreateFunctionPointer((MessagingFilterApplyCallbackDelegate)MessagingFilterApply),
- messagingFilterDestroy = CreateFunctionPointer((MessagingFilterDestroyCallbackDelegate)MessagingFilterDestroy),
-
- eventFilterCreate = CreateFunctionPointer((EventFilterCreateCallbackDelegate)EventFilterCreate),
- eventFilterApply = CreateFunctionPointer((EventFilterApplyCallbackDelegate)EventFilterApply),
- eventFilterDestroy = CreateFunctionPointer((EventFilterDestroyCallbackDelegate)EventFilterDestroy),
-
- serviceInit = CreateFunctionPointer((ServiceInitCallbackDelegate)ServiceInit),
- serviceExecute = CreateFunctionPointer((ServiceExecuteCallbackDelegate)ServiceExecute),
- serviceCancel = CreateFunctionPointer((ServiceCancelCallbackDelegate)ServiceCancel),
- serviceInvokeMethod = CreateFunctionPointer((ServiceInvokeMethodCallbackDelegate)ServiceInvokeMethod),
-
- clusterNodeFilterApply = CreateFunctionPointer((ClusterNodeFilterApplyCallbackDelegate)ClusterNodeFilterApply),
-
- onStart = CreateFunctionPointer((OnStartCallbackDelegate)OnStart),
- onStop = CreateFunctionPointer((OnStopCallbackDelegate)OnStop),
error = CreateFunctionPointer((ErrorCallbackDelegate)Error),
- extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong),
- extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong),
-
- onClientDisconnected = CreateFunctionPointer((OnClientDisconnectedDelegate)OnClientDisconnected),
- ocClientReconnected = CreateFunctionPointer((OnClientReconnectedDelegate)OnClientReconnected),
-
- affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit),
- affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition),
- affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions),
- affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode),
- affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy),
-
loggerLog = CreateFunctionPointer((LoggerLogDelegate)LoggerLog),
- loggerIsLevelEnabled = CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled)
+ loggerIsLevelEnabled = CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled),
+
+ inLongOutLong = CreateFunctionPointer((InLongOutLongDelegate)InLongOutLong),
+ inLongLongObjectOutLong = CreateFunctionPointer((InLongLongLongObjectOutLongDelegate)InLongLongLongObjectOutLong)
};
_cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
@@ -294,6 +153,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
Marshal.StructureToPtr(cbs, _cbsPtr, false);
_thisHnd = GCHandle.Alloc(this);
+
+ InitHandlers();
}
/// <summary>
@@ -304,106 +165,257 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
get { return _handleRegistry; }
}
- #region IMPLEMENTATION: CACHE
+ #region HANDLERS
+
+ /// <summary>
+ /// Initializes the handlers.
+ /// </summary>
+ private void InitHandlers()
+ {
+ AddHandler(UnmanagedCallbackOp.CacheStoreCreate, CacheStoreCreate, true);
+ AddHandler(UnmanagedCallbackOp.CacheStoreInvoke, CacheStoreInvoke);
+ AddHandler(UnmanagedCallbackOp.CacheStoreDestroy, CacheStoreDestroy);
+ AddHandler(UnmanagedCallbackOp.CacheStoreSessionCreate, CacheStoreSessionCreate);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterCreate, CacheEntryFilterCreate);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterApply, CacheEntryFilterApply);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterDestroy, CacheEntryFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.CacheInvoke, CacheInvoke);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskMap, ComputeTaskMap);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskJobResult, ComputeTaskJobResult);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskReduce, ComputeTaskReduce);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskComplete, ComputeTaskComplete);
+ AddHandler(UnmanagedCallbackOp.ComputeJobSerialize, ComputeJobSerialize);
+ AddHandler(UnmanagedCallbackOp.ComputeJobCreate, ComputeJobCreate);
+ AddHandler(UnmanagedCallbackOp.ComputeJobExecute, ComputeJobExecute);
+ AddHandler(UnmanagedCallbackOp.ComputeJobCancel, ComputeJobCancel);
+ AddHandler(UnmanagedCallbackOp.ComputeJobDestroy, ComputeJobDestroy);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryListenerApply, ContinuousQueryListenerApply);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterCreate, ContinuousQueryFilterCreate);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterApply, ContinuousQueryFilterApply);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterRelease, ContinuousQueryFilterRelease);
+ AddHandler(UnmanagedCallbackOp.DataStreamerTopologyUpdate, DataStreamerTopologyUpdate);
+ AddHandler(UnmanagedCallbackOp.DataStreamerStreamReceiverInvoke, DataStreamerStreamReceiverInvoke);
+ AddHandler(UnmanagedCallbackOp.FutureByteResult, FutureByteResult);
+ AddHandler(UnmanagedCallbackOp.FutureBoolResult, FutureBoolResult);
+ AddHandler(UnmanagedCallbackOp.FutureShortResult, FutureShortResult);
+ AddHandler(UnmanagedCallbackOp.FutureCharResult, FutureCharResult);
+ AddHandler(UnmanagedCallbackOp.FutureIntResult, FutureIntResult);
+ AddHandler(UnmanagedCallbackOp.FutureFloatResult, FutureFloatResult);
+ AddHandler(UnmanagedCallbackOp.FutureLongResult, FutureLongResult);
+ AddHandler(UnmanagedCallbackOp.FutureDoubleResult, FutureDoubleResult);
+ AddHandler(UnmanagedCallbackOp.FutureObjectResult, FutureObjectResult);
+ AddHandler(UnmanagedCallbackOp.FutureNullResult, FutureNullResult);
+ AddHandler(UnmanagedCallbackOp.FutureError, FutureError);
+ AddHandler(UnmanagedCallbackOp.LifecycleOnEvent, LifecycleOnEvent, true);
+ AddHandler(UnmanagedCallbackOp.MemoryReallocate, MemoryReallocate, true);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterCreate, MessagingFilterCreate);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterApply, MessagingFilterApply);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterDestroy, MessagingFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.EventFilterCreate, EventFilterCreate);
+ AddHandler(UnmanagedCallbackOp.EventFilterApply, EventFilterApply);
+ AddHandler(UnmanagedCallbackOp.EventFilterDestroy, EventFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.ServiceInit, ServiceInit);
+ AddHandler(UnmanagedCallbackOp.ServiceExecute, ServiceExecute);
+ AddHandler(UnmanagedCallbackOp.ServiceCancel, ServiceCancel);
+ AddHandler(UnmanagedCallbackOp.ServiceInvokeMethod, ServiceInvokeMethod);
+ AddHandler(UnmanagedCallbackOp.ClusterNodeFilterApply, ClusterNodeFilterApply);
+ AddHandler(UnmanagedCallbackOp.NodeInfo, NodeInfo);
+ AddHandler(UnmanagedCallbackOp.OnStart, OnStart, true);
+ AddHandler(UnmanagedCallbackOp.OnStop, OnStop, true);
+ AddHandler(UnmanagedCallbackOp.ExtensionInLongLongOutLong, ExtensionCallbackInLongLongOutLong, true);
+ AddHandler(UnmanagedCallbackOp.OnClientDisconnected, OnClientDisconnected);
+ AddHandler(UnmanagedCallbackOp.OnClientReconnected, OnClientReconnected);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionInit, AffinityFunctionInit);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionPartition, AffinityFunctionPartition);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionAssignPartitions, AffinityFunctionAssignPartitions);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionRemoveNode, AffinityFunctionRemoveNode);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionDestroy, AffinityFunctionDestroy);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskLocalJobResult, ComputeTaskLocalJobResult);
+ AddHandler(UnmanagedCallbackOp.ComputeJobExecuteLocal, ComputeJobExecuteLocal);
+ }
+
+ /// <summary>
+ /// Adds the handler.
+ /// </summary>
+ private void AddHandler(UnmanagedCallbackOp op, InLongOutLongFunc func, bool allowUninitialized = false)
+ {
+ _inLongOutLongHandlers[(int)op] = new InLongOutLongHandler(func, allowUninitialized);
+ }
+
+ /// <summary>
+ /// Adds the handler.
+ /// </summary>
+ private void AddHandler(UnmanagedCallbackOp op, InLongLongLongObjectOutLongFunc func,
+ bool allowUninitialized = false)
+ {
+ _inLongLongLongObjectOutLongHandlers[(int)op]
+ = new InLongLongLongObjectOutLongHandler(func, allowUninitialized);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: GENERAL PURPOSE
+
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
+ private long InLongOutLong(void* target, int type, long val)
+ {
+ try
+ {
+ if (type < 0 || type > _inLongOutLongHandlers.Length)
+ throw GetInvalidOpError("InLongOutLong", type);
+
+ var hnd = _inLongOutLongHandlers[type];
+
+ if (hnd.Handler == null)
+ throw GetInvalidOpError("InLongOutLong", type);
- private long CacheStoreCreate(void* target, long memPtr)
+ if (!hnd.AllowUninitialized)
+ _initEvent.Wait();
+
+ return hnd.Handler(val);
+ }
+ catch (Exception e)
+ {
+ _log.Error(e, "Failure in Java callback");
+
+ UU.ThrowToJava(_ctx.NativeContext, e);
+
+ return 0;
+ }
+ }
+
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
+ private long InLongLongLongObjectOutLong(void* target, int type, long val1, long val2, long val3, void* arg)
{
- return SafeCall(() =>
+ try
{
- var cacheStore = CacheStore.CreateInstance(memPtr, _handleRegistry);
+ if (type < 0 || type > _inLongLongLongObjectOutLongHandlers.Length)
+ throw GetInvalidOpError("InLongLongLongObjectOutLong", type);
+
+ var hnd = _inLongLongLongObjectOutLongHandlers[type];
+
+ if (hnd.Handler == null)
+ throw GetInvalidOpError("InLongLongLongObjectOutLong", type);
+
+ if (!hnd.AllowUninitialized)
+ _initEvent.Wait();
+
+ return hnd.Handler(val1, val2, val3, arg);
+ }
+ catch (Exception e)
+ {
+ _log.Error(e, "Failure in Java callback");
+
+ UU.ThrowToJava(_ctx.NativeContext, e);
+
+ return 0;
+ }
+ }
- if (_ignite != null)
- cacheStore.Init(_ignite);
- else
+ /// <summary>
+ /// Throws the invalid op error.
+ /// </summary>
+ private static Exception GetInvalidOpError(string method, int type)
+ {
+ return new InvalidOperationException(
+ string.Format("Invalid {0} callback code: {1}", method, (UnmanagedCallbackOp) type));
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: CACHE
+
+ private long CacheStoreCreate(long memPtr)
+ {
+ var cacheStore = CacheStore.CreateInstance(memPtr, _handleRegistry);
+
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ {
+ lock (_initActions)
{
- lock (_initActions)
- {
- if (_ignite != null)
- cacheStore.Init(_ignite);
- else
- _initActions.Add(cacheStore.Init);
- }
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ _initActions.Add(cacheStore.Init);
}
+ }
- return cacheStore.Handle;
- }, true);
+ return cacheStore.Handle;
}
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
- private int CacheStoreInvoke(void* target, long objPtr, long memPtr)
+ private long CacheStoreInvoke(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var t = _handleRegistry.Get<CacheStore>(objPtr, true);
+ try
+ {
+ var store = _handleRegistry.Get<CacheStore>(stream.ReadLong(), true);
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ return store.Invoke(stream, _ignite);
+ }
+ catch (Exception e)
{
- try
- {
- return t.Invoke(stream, _ignite);
- }
- catch (Exception e)
- {
- stream.Seek(0, SeekOrigin.Begin);
+ stream.Reset();
- _ignite.Marshaller.StartMarshal(stream).WriteObject(e);
+ _ignite.Marshaller.StartMarshal(stream).WriteObject(e);
- return -1;
- }
+ return -1;
}
- });
+ }
}
- private void CacheStoreDestroy(void* target, long objPtr)
+ private long CacheStoreDestroy(long objPtr)
{
- SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ _ignite.HandleRegistry.Release(objPtr);
+
+ return 0;
}
- private long CacheStoreSessionCreate(void* target, long storePtr)
+ private long CacheStoreSessionCreate(long val)
{
- return SafeCall(() => _ignite.HandleRegistry.Allocate(new CacheStoreSession()));
+ return _ignite.HandleRegistry.Allocate(new CacheStoreSession());
}
- private long CacheEntryFilterCreate(void* target, long memPtr)
+ private long CacheEntryFilterCreate(long memPtr)
{
- return SafeCall(() => _handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr, _ignite)));
+ return _handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr, _ignite));
}
- private int CacheEntryFilterApply(void* target, long objPtr, long memPtr)
+ private long CacheEntryFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(objPtr);
+ var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong());
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return t.Invoke(stream);
- }
- });
+ return t.Invoke(stream);
+ }
}
- private void CacheEntryFilterDestroy(void* target, long objPtr)
+ private long CacheEntryFilterDestroy(long objPtr)
{
- SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ _ignite.HandleRegistry.Release(objPtr);
+
+ return 0;
}
- private void CacheInvoke(void* target, long inMemPtr, long outMemPtr)
+ private long CacheInvoke(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- var result = ReadAndRunCacheEntryProcessor(inStream, _ignite);
+ var result = ReadAndRunCacheEntryProcessor(stream, _ignite);
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- result.Write(outStream, _ignite.Marshaller);
+ stream.Reset();
- outStream.SynchronizeOutput();
- }
- }
- });
+ result.Write(stream, _ignite.Marshaller);
+
+ stream.SynchronizeOutput();
+ }
+
+ return 0;
}
/// <summary>
@@ -432,122 +444,110 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: COMPUTE
- private void ComputeTaskMap(void* target, long taskPtr, long inMemPtr, long outMemPtr)
+ private long ComputeTaskMap(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- Task(taskPtr).Map(inStream, outStream);
- }
- }
- });
+ Task(stream.ReadLong()).Map(stream);
+
+ return 0;
+ }
+ }
+
+ private long ComputeTaskLocalJobResult(long taskPtr, long jobPtr, long unused, void* arg)
+ {
+ return Task(taskPtr).JobResultLocal(Job(jobPtr));
}
- private int ComputeTaskJobResult(void* target, long taskPtr, long jobPtr, long memPtr)
+ private long ComputeTaskJobResult(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var task = Task(taskPtr);
+ var task = Task(stream.ReadLong());
- if (memPtr == 0)
- {
- return task.JobResultLocal(Job(jobPtr));
- }
+ var job = Job(stream.ReadLong());
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return task.JobResultRemote(Job(jobPtr), stream);
- }
- });
+ return task.JobResultRemote(job, stream);
+ }
}
- private void ComputeTaskReduce(void* target, long taskPtr)
+ private long ComputeTaskReduce(long taskPtr)
{
- SafeCall(() =>
- {
- var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
+ _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true).Reduce();
- task.Reduce();
- });
+ return 0;
}
- private void ComputeTaskComplete(void* target, long taskPtr, long memPtr)
+ private long ComputeTaskComplete(long taskPtr, long memPtr, long unused, void* arg)
{
- SafeCall(() =>
- {
- var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
+ var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
- if (memPtr == 0)
- task.Complete(taskPtr);
- else
+ if (memPtr == 0)
+ task.Complete(taskPtr);
+ else
+ {
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- task.CompleteWithError(taskPtr, stream);
- }
+ task.CompleteWithError(taskPtr, stream);
}
- });
+ }
+
+ return 0;
}
- private int ComputeJobSerialize(void* target, long jobPtr, long memPtr)
+ private long ComputeJobSerialize(long jobPtr, long memPtr, long unused, void* arg)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return Job(jobPtr).Serialize(stream) ? 1 : 0;
- }
- });
+ return Job(jobPtr).Serialize(stream) ? 1 : 0;
+ }
}
- private long ComputeJobCreate(void* target, long memPtr)
+ private long ComputeJobCreate(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite, stream);
+ ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite, stream);
- return _handleRegistry.Allocate(job);
- }
- });
+ return _handleRegistry.Allocate(job);
+ }
}
- private void ComputeJobExecute(void* target, long jobPtr, int cancel, long memPtr)
+ private long ComputeJobExecuteLocal(long jobPtr, long cancel, long unused, void* arg)
{
- SafeCall(() =>
- {
- var job = Job(jobPtr);
+ Job(jobPtr).ExecuteLocal(cancel == 1);
- if (memPtr == 0)
- job.ExecuteLocal(cancel == 1);
- else
- {
- using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- job.ExecuteRemote(stream, cancel == 1);
- }
- }
- });
+ return 0;
}
- private void ComputeJobCancel(void* target, long jobPtr)
+ private long ComputeJobExecute(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- Job(jobPtr).Cancel();
- });
+ var job = Job(stream.ReadLong());
+
+ var cancel = stream.ReadBool();
+
+ stream.Reset();
+
+ job.ExecuteRemote(stream, cancel);
+ }
+
+ return 0;
}
- private void ComputeJobDestroy(void* target, long jobPtr)
+ private long ComputeJobCancel(long jobPtr)
{
- SafeCall(() =>
- {
- _handleRegistry.Release(jobPtr);
- });
+ Job(jobPtr).Cancel();
+
+ return 0;
+ }
+
+ private long ComputeJobDestroy(long jobPtr)
+ {
+ _handleRegistry.Release(jobPtr);
+
+ return 0;
}
/// <summary>
@@ -574,219 +574,178 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: CONTINUOUS QUERY
- private void ContinuousQueryListenerApply(void* target, long lsnrPtr, long memPtr)
+ private long ContinuousQueryListenerApply(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var hnd = _handleRegistry.Get<IContinuousQueryHandleImpl>(lsnrPtr);
+ var hnd = _handleRegistry.Get<IContinuousQueryHandleImpl>(stream.ReadLong());
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- hnd.Apply(stream);
- }
- });
+ hnd.Apply(stream);
+
+ return 0;
+ }
}
[SuppressMessage("ReSharper", "PossibleNullReferenceException")]
- private long ContinuousQueryFilterCreate(void* target, long memPtr)
+ private long ContinuousQueryFilterCreate(long memPtr)
{
- return SafeCall(() =>
+ // 1. Unmarshal filter holder.
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- // 1. Unmarshal filter holder.
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var filterHolder = reader.ReadObject<ContinuousQueryFilterHolder>();
+ var filterHolder = reader.ReadObject<ContinuousQueryFilterHolder>();
- // 2. Create real filter from it's holder.
- var filter = (IContinuousQueryFilter)DelegateTypeDescriptor.GetContinuousQueryFilterCtor(
- filterHolder.Filter.GetType())(filterHolder.Filter, filterHolder.KeepBinary);
+ // 2. Create real filter from it's holder.
+ var filter = (IContinuousQueryFilter) DelegateTypeDescriptor.GetContinuousQueryFilterCtor(
+ filterHolder.Filter.GetType())(filterHolder.Filter, filterHolder.KeepBinary);
- // 3. Inject grid.
- filter.Inject(_ignite);
+ // 3. Inject grid.
+ filter.Inject(_ignite);
- // 4. Allocate GC handle.
- return filter.Allocate();
- }
- });
+ // 4. Allocate GC handle.
+ return filter.Allocate();
+ }
}
- private int ContinuousQueryFilterApply(void* target, long filterPtr, long memPtr)
+ private long ContinuousQueryFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+ var holder = _handleRegistry.Get<IContinuousQueryFilter>(stream.ReadLong());
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Evaluate(stream) ? 1 : 0;
- }
- });
+ return holder.Evaluate(stream) ? 1 : 0;
+ }
}
- private void ContinuousQueryFilterRelease(void* target, long filterPtr)
+ private long ContinuousQueryFilterRelease(long filterPtr)
{
- SafeCall(() =>
- {
- var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+ var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
- holder.Release();
- });
+ holder.Release();
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: DATA STREAMER
- private void DataStreamerTopologyUpdate(void* target, long ldrPtr, long topVer, int topSize)
+ private long DataStreamerTopologyUpdate(long ldrPtr, long topVer, long topSize, void* unused)
{
- SafeCall(() =>
- {
- var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr);
+ var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr);
- if (ldrRef == null)
- return;
+ if (ldrRef == null)
+ return 0;
- var ldr = ldrRef.Target as IDataStreamer;
+ var ldr = ldrRef.Target as IDataStreamer;
- if (ldr != null)
- ldr.TopologyChange(topVer, topSize);
- else
- _handleRegistry.Release(ldrPtr, true);
- });
+ if (ldr != null)
+ ldr.TopologyChange(topVer, (int) topSize);
+ else
+ _handleRegistry.Release(ldrPtr, true);
+
+ return 0;
}
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
- private void DataStreamerStreamReceiverInvoke(void* target, long rcvPtr, void* cache, long memPtr,
- byte keepBinary)
+ private long DataStreamerStreamReceiverInvoke(long memPtr, long unused, long unused1, void* cache)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream, BinaryMode.ForceBinary);
+ var rcvPtr = stream.ReadLong();
- var binaryReceiver = reader.ReadObject<BinaryObject>();
+ var keepBinary = stream.ReadBool();
- var receiver = _handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ??
- binaryReceiver.Deserialize<StreamReceiverHolder>();
+ var reader = _ignite.Marshaller.StartUnmarshal(stream, BinaryMode.ForceBinary);
- if (receiver != null)
- receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx, cache), stream,
- keepBinary != 0);
- }
- });
+ var binaryReceiver = reader.ReadObject<BinaryObject>();
+
+ var receiver = _handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ??
+ binaryReceiver.Deserialize<StreamReceiverHolder>();
+
+ if (receiver != null)
+ receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx, cache), stream, keepBinary);
+
+ return 0;
+ }
}
#endregion
#region IMPLEMENTATION: FUTURES
- private void FutureByteResult(void* target, long futPtr, int res)
+ private long FutureByteResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte)res); });
- });
+ return ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte) res); });
}
- private void FutureBoolResult(void* target, long futPtr, int res)
+ private long FutureBoolResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1); });
- });
+ return ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1); });
}
- private void FutureShortResult(void* target, long futPtr, int res)
+ private long FutureShortResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<short>(futPtr, fut => { fut.OnResult((short)res); });
- });
+ return ProcessFuture<short>(futPtr, fut => { fut.OnResult((short)res); });
}
- private void FutureCharResult(void* target, long futPtr, int res)
+ private long FutureCharResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res); });
- });
+ return ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res); });
}
- private void FutureIntResult(void* target, long futPtr, int res)
+ private long FutureIntResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<int>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<int>(futPtr, fut => { fut.OnResult((int) res); });
}
- private void FutureFloatResult(void* target, long futPtr, float res)
+ private long FutureFloatResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<float>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<float>(futPtr, fut => { fut.OnResult(BinaryUtils.IntToFloatBits((int) res)); });
}
- private void FutureLongResult(void* target, long futPtr, long res)
+ private long FutureLongResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); });
}
- private void FutureDoubleResult(void* target, long futPtr, double res)
+ private long FutureDoubleResult(long futPtr, long res, long unused, void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<double>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<double>(futPtr, fut => { fut.OnResult(BinaryUtils.LongToDoubleBits(res)); });
}
- private void FutureObjectResult(void* target, long futPtr, long memPtr)
+ private long FutureObjectResult(long futPtr, long memPtr, long unused, void* arg)
{
- SafeCall(() =>
+ return ProcessFuture(futPtr, fut =>
{
- ProcessFuture(futPtr, fut =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- fut.OnResult(stream);
- }
- });
+ fut.OnResult(stream);
+ }
});
}
- private void FutureNullResult(void* target, long futPtr)
+ private long FutureNullResult(long futPtr)
{
- SafeCall(() =>
- {
- ProcessFuture(futPtr, fut => { fut.OnNullResult(); });
- });
+ return ProcessFuture(futPtr, fut => { fut.OnNullResult(); });
}
- private void FutureError(void* target, long futPtr, long memPtr)
+ private long FutureError(long futPtr, long memPtr, long unused, void* arg)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- string errCls = reader.ReadString();
- string errMsg = reader.ReadString();
- string stackTrace = reader.ReadString();
- Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null;
+ string errCls = reader.ReadString();
+ string errMsg = reader.ReadString();
+ string stackTrace = reader.ReadString();
+ Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null;
- Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner);
+ Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner);
- ProcessFuture(futPtr, fut => { fut.OnError(err); });
- }
- });
+ return ProcessFuture(futPtr, fut => { fut.OnError(err); });
+ }
}
/// <summary>
@@ -794,11 +753,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/// </summary>
/// <param name="futPtr">Future pointer.</param>
/// <param name="action">Action.</param>
- private void ProcessFuture(long futPtr, Action<IFutureInternal> action)
+ private long ProcessFuture(long futPtr, Action<IFutureInternal> action)
{
try
{
action(_handleRegistry.Get<IFutureInternal>(futPtr, true));
+
+ return 0;
}
finally
{
@@ -811,11 +772,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/// </summary>
/// <param name="futPtr">Future pointer.</param>
/// <param name="action">Action.</param>
- private void ProcessFuture<T>(long futPtr, Action<Future<T>> action)
+ private long ProcessFuture<T>(long futPtr, Action<Future<T>> action)
{
try
{
action(_handleRegistry.Get<Future<T>>(futPtr, true));
+
+ return 0;
}
finally
{
@@ -827,257 +790,230 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: LIFECYCLE
- private void LifecycleOnEvent(void* target, long ptr, int evt)
+ private long LifecycleOnEvent(long ptr, long evt, long unused, void* arg)
{
- SafeCall(() =>
- {
- var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr);
+ var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr);
- bean.OnLifecycleEvent((LifecycleEventType)evt);
- }, true);
+ bean.OnLifecycleEvent((LifecycleEventType) evt);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: MESSAGING
- private long MessagingFilterCreate(void* target, long memPtr)
+ private long MessagingFilterCreate(long memPtr)
{
- return SafeCall(() =>
- {
- MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr);
+ MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr);
- return _ignite.HandleRegistry.AllocateSafe(holder);
- });
+ return _ignite.HandleRegistry.AllocateSafe(holder);
}
- private int MessagingFilterApply(void* target, long ptr, long memPtr)
+ private long MessagingFilterApply(long ptr, long memPtr, long unused, void* arg)
{
- return SafeCall(() =>
- {
- var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
+ var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
- if (holder == null)
- return 0;
+ if (holder == null)
+ return 0;
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Invoke(stream);
- }
- });
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return holder.Invoke(stream);
+ }
}
- private void MessagingFilterDestroy(void* target, long ptr)
+ private long MessagingFilterDestroy(long ptr)
{
- SafeCall(() =>
- {
- _ignite.HandleRegistry.Release(ptr);
- });
+ _ignite.HandleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: EXTENSIONS
- private long ExtensionCallbackInLongOutLong(void* target, int op, long arg1)
+ private long ExtensionCallbackInLongLongOutLong(long op, long arg1, long arg2, void* arg)
{
- throw new InvalidOperationException("Unsupported operation type: " + op);
- }
-
- private long ExtensionCallbackInLongLongOutLong(void* target, int op, long arg1, long arg2)
- {
- return SafeCall(() =>
+ switch (op)
{
- switch (op)
- {
- case OpPrepareDotNet:
- using (var inStream = IgniteManager.Memory.Get(arg1).GetStream())
- using (var outStream = IgniteManager.Memory.Get(arg2).GetStream())
- {
- Ignition.OnPrepare(inStream, outStream, _handleRegistry, _log);
+ case OpPrepareDotNet:
+ using (var inStream = IgniteManager.Memory.Get(arg1).GetStream())
+ using (var outStream = IgniteManager.Memory.Get(arg2).GetStream())
+ {
+ Ignition.OnPrepare(inStream, outStream, _handleRegistry, _log);
- return 0;
- }
+ return 0;
+ }
- default:
- throw new InvalidOperationException("Unsupported operation type: " + op);
- }
- }, op == OpPrepareDotNet);
+ default:
+ throw new InvalidOperationException("Unsupported operation type: " + op);
+ }
}
#endregion
#region IMPLEMENTATION: EVENTS
- private long EventFilterCreate(void* target, long memPtr)
+ private long EventFilterCreate(long memPtr)
{
- return SafeCall(() => _handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr, _ignite)));
+ return _handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr, _ignite));
}
- private int EventFilterApply(void* target, long ptr, long memPtr)
+ private long EventFilterApply(long ptr, long memPtr, long unused, void* arg)
{
- return SafeCall(() =>
- {
- var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr, false);
+ var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr, false);
- if (holder == null)
- return 0;
+ if (holder == null)
+ return 0;
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Invoke(stream);
- }
- });
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return holder.Invoke(stream);
+ }
}
- private void EventFilterDestroy(void* target, long ptr)
+ private long EventFilterDestroy(long ptr)
{
- SafeCall(() =>
- {
- _ignite.HandleRegistry.Release(ptr);
- });
+ _ignite.HandleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: SERVICES
- private long ServiceInit(void* target, long memPtr)
+ private long ServiceInit(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- bool srvKeepBinary = reader.ReadBoolean();
- var svc = reader.ReadObject<IService>();
+ bool srvKeepBinary = reader.ReadBoolean();
+ var svc = reader.ReadObject<IService>();
- ResourceProcessor.Inject(svc, _ignite);
+ ResourceProcessor.Inject(svc, _ignite);
- svc.Init(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+ svc.Init(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
- return _handleRegistry.Allocate(svc);
- }
- });
+ return _handleRegistry.Allocate(svc);
+ }
}
- private void ServiceExecute(void* target, long svcPtr, long memPtr)
+ private long ServiceExecute(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var svc = _handleRegistry.Get<IService>(svcPtr);
+ var svc = _handleRegistry.Get<IService>(stream.ReadLong());
// Ignite does not guarantee that Cancel is called after Execute exits
// So missing handle is a valid situation
if (svc == null)
- return;
+ return 0;
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- bool srvKeepBinary = reader.ReadBoolean();
+ bool srvKeepBinary = reader.ReadBoolean();
- svc.Execute(new ServiceContext(
- _ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
- }
- });
+ svc.Execute(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+
+ return 0;
+ }
}
- private void ServiceCancel(void* target, long svcPtr, long memPtr)
+ private long ServiceCancel(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var svc = _handleRegistry.Get<IService>(svcPtr, true);
+ long svcPtr = stream.ReadLong();
try
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var svc = _handleRegistry.Get<IService>(svcPtr, true);
- bool srvKeepBinary = reader.ReadBoolean();
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- svc.Cancel(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
- }
+ bool srvKeepBinary = reader.ReadBoolean();
+
+ svc.Cancel(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+
+ return 0;
}
finally
{
_ignite.HandleRegistry.Release(svcPtr);
}
- });
+ }
}
- private void ServiceInvokeMethod(void* target, long svcPtr, long inMemPtr, long outMemPtr)
+ private long ServiceInvokeMethod(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
- using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- var svc = _handleRegistry.Get<IService>(svcPtr, true);
+ var svc = _handleRegistry.Get<IService>(stream.ReadLong(), true);
- string mthdName;
- object[] mthdArgs;
+ string mthdName;
+ object[] mthdArgs;
- ServiceProxySerializer.ReadProxyMethod(inStream, _ignite.Marshaller, out mthdName, out mthdArgs);
+ ServiceProxySerializer.ReadProxyMethod(stream, _ignite.Marshaller, out mthdName, out mthdArgs);
- var result = ServiceProxyInvoker.InvokeServiceMethod(svc, mthdName, mthdArgs);
+ var result = ServiceProxyInvoker.InvokeServiceMethod(svc, mthdName, mthdArgs);
- ServiceProxySerializer.WriteInvocationResult(outStream, _ignite.Marshaller, result.Key, result.Value);
+ stream.Reset();
- outStream.SynchronizeOutput();
- }
- });
+ ServiceProxySerializer.WriteInvocationResult(stream, _ignite.Marshaller, result.Key, result.Value);
+
+ stream.SynchronizeOutput();
+
+ return 0;
+ }
}
- private int ClusterNodeFilterApply(void* target, long memPtr)
+ private long ClusterNodeFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var filter = reader.ReadObject<IClusterNodeFilter>();
+ var filter = reader.ReadObject<IClusterNodeFilter>();
- return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 : 0;
- }
- });
+ return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 : 0;
+ }
}
#endregion
#region IMPLEMENTATION: MISCELLANEOUS
- private void NodeInfo(void* target, long memPtr)
+ private long NodeInfo(long memPtr)
{
- SafeCall(() => _ignite.UpdateNodeInfo(memPtr));
+ _ignite.UpdateNodeInfo(memPtr);
+
+ return 0;
}
- private void MemoryReallocate(void* target, long memPtr, int cap)
+ private long MemoryReallocate(long memPtr, long cap, long unused, void* arg)
{
- SafeCall(() =>
- {
- IgniteManager.Memory.Get(memPtr).Reallocate(cap);
- }, true);
+ IgniteManager.Memory.Get(memPtr).Reallocate((int)cap);
+
+ return 0;
}
- private void OnStart(void* target, void* proc, long memPtr)
+ private long OnStart(long memPtr, long unused, long unused1, void* proc)
{
- SafeCall(() =>
+ var proc0 = UU.Acquire(_ctx, proc);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var proc0 = UnmanagedUtils.Acquire(_ctx, proc);
+ Ignition.OnStart(proc0, stream);
+ }
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- Ignition.OnStart(proc0, stream);
- }
- }, true);
+ return 0;
}
- private void OnStop(void* target)
+ private long OnStop(long unused)
{
Marshal.FreeHGlobal(_cbsPtr);
@@ -1092,6 +1028,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
if (ignite != null)
ignite.AfterNodeStop();
+
+ return 0;
}
private void Error(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars,
@@ -1122,20 +1060,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
}
- private void OnClientDisconnected(void* target)
+ private long OnClientDisconnected(long unused)
{
- SafeCall(() =>
- {
- _ignite.OnClientDisconnected();
- });
+ _ignite.OnClientDisconnected();
+
+ return 0;
}
- private void OnClientReconnected(void* target, bool clusterRestarted)
+ private long OnClientReconnected(long clusterRestarted)
{
- SafeCall(() =>
- {
- _ignite.OnClientReconnected(clusterRestarted);
- });
+ _ignite.OnClientReconnected(clusterRestarted != 0);
+
+ return 0;
}
private void LoggerLog(void* target, int level, sbyte* messageChars, int messageCharsLen, sbyte* categoryChars,
@@ -1194,82 +1130,79 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region AffinityFunction
- private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc)
+ private long AffinityFunctionInit(long memPtr, long unused, long unused1, void* baseFunc)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var func = reader.ReadObjectEx<IAffinityFunction>();
+ var func = reader.ReadObjectEx<IAffinityFunction>();
- ResourceProcessor.Inject(func, _ignite);
+ ResourceProcessor.Inject(func, _ignite);
- var affBase = func as AffinityFunctionBase;
+ var affBase = func as AffinityFunctionBase;
- if (affBase != null)
- affBase.SetBaseFunction(new PlatformAffinityFunction(
- _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller));
+ if (affBase != null)
+ {
+ var baseFunc0 = UU.Acquire(_ctx, baseFunc);
- return _handleRegistry.Allocate(func);
+ affBase.SetBaseFunction(new PlatformAffinityFunction(baseFunc0, _ignite.Marshaller));
}
- });
+
+ return _handleRegistry.Allocate(func);
+ }
}
- private int AffinityFunctionPartition(void* target, long ptr, long memPtr)
+ private long AffinityFunctionPartition(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var key = _ignite.Marshaller.Unmarshal<object>(stream);
+ var ptr = stream.ReadLong();
- return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key);
- }
- });
+ var key = _ignite.Marshaller.Unmarshal<object>(stream);
+
+ return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key);
+ }
}
- private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr)
+ private long AffinityFunctionAssignPartitions(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream));
- var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
- var parts = func.AssignPartitions(ctx);
+ var ptr = stream.ReadLong();
+ var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(stream));
+ var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
+ var parts = func.AssignPartitions(ctx);
- if (parts == null)
- throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null");
+ if (parts == null)
+ throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null");
- using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- AffinityFunctionSerializer.WritePartitions(parts, outStream, _ignite.Marshaller);
- }
- }
- });
+ stream.Reset();
+
+ AffinityFunctionSerializer.WritePartitions(parts, stream, _ignite.Marshaller);
+
+ return 0;
+ }
}
- private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr)
+ private long AffinityFunctionRemoveNode(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
+ var ptr = stream.ReadLong();
+ var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
- _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId);
- }
- });
+ _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId);
+
+ return 0;
+ }
}
- private void AffinityFunctionDestroy(void* target, long ptr)
+ private long AffinityFunctionDestroy(long ptr)
{
- SafeCall(() =>
- {
- _handleRegistry.Release(ptr);
- });
+ _handleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
@@ -1315,7 +1248,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
#endregion
-
+
/// <summary>
/// Callbacks pointer.
/// </summary>
@@ -1398,5 +1331,47 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
get { return ConsoleWritePtr; }
}
+
+ /// <summary>
+ /// InLongOutLong handler struct.
+ /// </summary>
+ private struct InLongOutLongHandler
+ {
+ /// <summary> The handler func. </summary>
+ public readonly InLongOutLongFunc Handler;
+
+ /// <summary> Allow uninitialized flag. </summary>
+ public readonly bool AllowUninitialized;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="InLongOutLongHandler"/> struct.
+ /// </summary>
+ public InLongOutLongHandler(InLongOutLongFunc handler, bool allowUninitialized)
+ {
+ Handler = handler;
+ AllowUninitialized = allowUninitialized;
+ }
+ }
+
+ /// <summary>
+ /// InLongLongLongObjectOutLong handler struct.
+ /// </summary>
+ private struct InLongLongLongObjectOutLongHandler
+ {
+ /// <summary> The handler func. </summary>
+ public readonly InLongLongLongObjectOutLongFunc Handler;
+
+ /// <summary> Allow uninitialized flag. </summary>
+ public readonly bool AllowUninitialized;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="InLongLongLongObjectOutLongHandler"/> struct.
+ /// </summary>
+ public InLongLongLongObjectOutLongHandler(InLongLongLongObjectOutLongFunc handler, bool allowUninitialized)
+ {
+ Handler = handler;
+ AllowUninitialized = allowUninitialized;
+ }
+ }
}
}
\ No newline at end of file