You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/04 18:27:44 UTC
[30/55] [abbrv] ignite git commit: IGNITE-1348: Moved GridGain's .Net
module to Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
new file mode 100644
index 0000000..80b33df
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -0,0 +1,1152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Unmanaged
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Runtime.InteropServices;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Event;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Cache;
+ using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
+ using Apache.Ignite.Core.Impl.Cache.Store;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute;
+ using Apache.Ignite.Core.Impl.Datastream;
+ using Apache.Ignite.Core.Impl.Events;
+ using Apache.Ignite.Core.Impl.Handle;
+ using Apache.Ignite.Core.Impl.Memory;
+ using Apache.Ignite.Core.Impl.Messaging;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Impl.Services;
+ using Apache.Ignite.Core.Lifecycle;
+ using Apache.Ignite.Core.Services;
+ using UU = UnmanagedUtils;
+
+ /// <summary>
+ /// Unmanaged callbacks.
+ /// </summary>
+ [SuppressMessage("ReSharper", "UnusedMember.Local")]
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable",
+ Justification = "This class instance usually lives as long as the app runs.")]
+ [SuppressMessage("Microsoft.Design", "CA1049:TypesThatOwnNativeResourcesShouldBeDisposable",
+ Justification = "This class instance usually lives as long as the app runs.")]
+ internal unsafe class UnmanagedCallbacks
+ {
+ /** Unmanaged context. */
+ private volatile UnmanagedContext _ctx;
+
+ /** Handle registry. */
+ private readonly HandleRegistry _handleRegistry = new HandleRegistry();
+
+ /** Grid. */
+ private volatile Ignite _ignite;
+
+ /** Keep references to created delegates. */
+ // ReSharper disable once CollectionNeverQueried.Local
+ private readonly List<Delegate> _delegates = new List<Delegate>(50);
+
+ /** Initialized flag. */
+ private readonly ManualResetEventSlim _initEvent = new ManualResetEventSlim(false);
+
+ /** Actions to be called upon Ignite initialisation. */
+ private readonly List<Action<Ignite>> _initActions = new List<Action<Ignite>>();
+
+ /** GC handle to UnmanagedCallbacks instance to prevent it from being GCed. */
+ private readonly GCHandle _thisHnd;
+
+ /** Callbacks pointer. */
+ private readonly IntPtr _cbsPtr;
+
+ /** Error type: generic. */
+ private const int ErrGeneric = 1;
+
+ /** Error type: initialize. */
+ private const int ErrJvmInit = 2;
+
+ /** Error type: attach. */
+ private const int ErrJvmAttach = 3;
+
+ /** Opeartion: prepare .Net. */
+ private const int OpPrepareDotNet = 1;
+
+ private delegate long CacheStoreCreateCallbackDelegate(void* target, long memPtr);
+ private delegate int CacheStoreInvokeCallbackDelegate(void* target, long objPtr, long memPtr, void* cb);
+ private delegate void CacheStoreDestroyCallbackDelegate(void* target, long objPtr);
+ private delegate long CacheStoreSessionCreateCallbackDelegate(void* target, long storePtr);
+
+ private delegate long CacheEntryFilterCreateCallbackDelegate(void* target, long memPtr);
+ private delegate int CacheEntryFilterApplyCallbackDelegate(void* target, long objPtr, long memPtr);
+ private delegate void CacheEntryFilterDestroyCallbackDelegate(void* target, long objPtr);
+
+ private delegate void CacheInvokeCallbackDelegate(void* target, long inMemPtr, long outMemPtr);
+
+ private delegate void ComputeTaskMapCallbackDelegate(void* target, long taskPtr, long inMemPtr, long outMemPtr);
+ private delegate int ComputeTaskJobResultCallbackDelegate(void* target, long taskPtr, long jobPtr, long memPtr);
+ private delegate void ComputeTaskReduceCallbackDelegate(void* target, long taskPtr);
+ private delegate void ComputeTaskCompleteCallbackDelegate(void* target, long taskPtr, long memPtr);
+ private delegate int ComputeJobSerializeCallbackDelegate(void* target, long jobPtr, long memPtr);
+ private delegate long ComputeJobCreateCallbackDelegate(void* target, long memPtr);
+ private delegate void ComputeJobExecuteCallbackDelegate(void* target, long jobPtr, int cancel, long memPtr);
+ private delegate void ComputeJobCancelCallbackDelegate(void* target, long jobPtr);
+ private delegate void ComputeJobDestroyCallbackDelegate(void* target, long jobPtr);
+
+ private delegate void ContinuousQueryListenerApplyCallbackDelegate(void* target, long lsnrPtr, long memPtr);
+ private delegate long ContinuousQueryFilterCreateCallbackDelegate(void* target, long memPtr);
+ private delegate int ContinuousQueryFilterApplyCallbackDelegate(void* target, long filterPtr, long memPtr);
+ private delegate void ContinuousQueryFilterReleaseCallbackDelegate(void* target, long filterPtr);
+
+ private delegate void DataStreamerTopologyUpdateCallbackDelegate(void* target, long ldrPtr, long topVer, int topSize);
+ private delegate void DataStreamerStreamReceiverInvokeCallbackDelegate(void* target, long ptr, void* cache, long memPtr, byte keepPortable);
+
+ private delegate void FutureByteResultCallbackDelegate(void* target, long futPtr, int res);
+ private delegate void FutureBoolResultCallbackDelegate(void* target, long futPtr, int res);
+ private delegate void FutureShortResultCallbackDelegate(void* target, long futPtr, int res);
+ private delegate void FutureCharResultCallbackDelegate(void* target, long futPtr, int res);
+ private delegate void FutureIntResultCallbackDelegate(void* target, long futPtr, int res);
+ private delegate void FutureFloatResultCallbackDelegate(void* target, long futPtr, float res);
+ private delegate void FutureLongResultCallbackDelegate(void* target, long futPtr, long res);
+ private delegate void FutureDoubleResultCallbackDelegate(void* target, long futPtr, double res);
+ private delegate void FutureObjectResultCallbackDelegate(void* target, long futPtr, long memPtr);
+ private delegate void FutureNullResultCallbackDelegate(void* target, long futPtr);
+ private delegate void FutureErrorCallbackDelegate(void* target, long futPtr, long memPtr);
+
+ private delegate void LifecycleOnEventCallbackDelegate(void* target, long ptr, int evt);
+
+ private delegate void MemoryReallocateCallbackDelegate(void* target, long memPtr, int cap);
+
+ private delegate long MessagingFilterCreateCallbackDelegate(void* target, long memPtr);
+ private delegate int MessagingFilterApplyCallbackDelegate(void* target, long ptr, long memPtr);
+ private delegate void MessagingFilterDestroyCallbackDelegate(void* target, long ptr);
+
+ private delegate long EventFilterCreateCallbackDelegate(void* target, long memPtr);
+ private delegate int EventFilterApplyCallbackDelegate(void* target, long ptr, long memPtr);
+ private delegate void EventFilterDestroyCallbackDelegate(void* target, long ptr);
+
+ private delegate long ServiceInitCallbackDelegate(void* target, long memPtr);
+ private delegate void ServiceExecuteCallbackDelegate(void* target, long svcPtr, long memPtr);
+ private delegate void ServiceCancelCallbackDelegate(void* target, long svcPtr, long memPtr);
+ private delegate void ServiceInvokeMethodCallbackDelegate(void* target, long svcPtr, long inMemPtr, long outMemPtr);
+
+ private delegate int СlusterNodeFilterApplyCallbackDelegate(void* target, long memPtr);
+
+ private delegate void NodeInfoCallbackDelegate(void* target, long memPtr);
+
+ private delegate void OnStartCallbackDelegate(void* target, long memPtr);
+ private delegate void OnStopCallbackDelegate(void* target);
+
+ private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, void* errData, int errDataLen);
+
+ private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1);
+ private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2);
+
+ /// <summary>
+ /// constructor.
+ /// </summary>
+ public UnmanagedCallbacks()
+ {
+ var cbs = new UnmanagedCallbackHandlers
+ {
+ target = IntPtr.Zero.ToPointer(), // Target is not used in .Net as we rely on dynamic FP creation.
+
+ cacheStoreCreate = CreateFunctionPointer((CacheStoreCreateCallbackDelegate) CacheStoreCreate),
+ cacheStoreInvoke = CreateFunctionPointer((CacheStoreInvokeCallbackDelegate) CacheStoreInvoke),
+ cacheStoreDestroy = CreateFunctionPointer((CacheStoreDestroyCallbackDelegate) CacheStoreDestroy),
+
+ cacheStoreSessionCreate = CreateFunctionPointer((CacheStoreSessionCreateCallbackDelegate) CacheStoreSessionCreate),
+
+ cacheEntryFilterCreate = CreateFunctionPointer((CacheEntryFilterCreateCallbackDelegate)CacheEntryFilterCreate),
+ cacheEntryFilterApply = CreateFunctionPointer((CacheEntryFilterApplyCallbackDelegate)CacheEntryFilterApply),
+ cacheEntryFilterDestroy = CreateFunctionPointer((CacheEntryFilterDestroyCallbackDelegate)CacheEntryFilterDestroy),
+
+ cacheInvoke = CreateFunctionPointer((CacheInvokeCallbackDelegate) CacheInvoke),
+
+ computeTaskMap = CreateFunctionPointer((ComputeTaskMapCallbackDelegate) ComputeTaskMap),
+ computeTaskJobResult =
+ CreateFunctionPointer((ComputeTaskJobResultCallbackDelegate) ComputeTaskJobResult),
+ computeTaskReduce = CreateFunctionPointer((ComputeTaskReduceCallbackDelegate) ComputeTaskReduce),
+ computeTaskComplete = CreateFunctionPointer((ComputeTaskCompleteCallbackDelegate) ComputeTaskComplete),
+ computeJobSerialize = CreateFunctionPointer((ComputeJobSerializeCallbackDelegate) ComputeJobSerialize),
+ computeJobCreate = CreateFunctionPointer((ComputeJobCreateCallbackDelegate) ComputeJobCreate),
+ computeJobExecute = CreateFunctionPointer((ComputeJobExecuteCallbackDelegate) ComputeJobExecute),
+ computeJobCancel = CreateFunctionPointer((ComputeJobCancelCallbackDelegate) ComputeJobCancel),
+ computeJobDestroy = CreateFunctionPointer((ComputeJobDestroyCallbackDelegate) ComputeJobDestroy),
+ continuousQueryListenerApply =
+ CreateFunctionPointer((ContinuousQueryListenerApplyCallbackDelegate) ContinuousQueryListenerApply),
+ continuousQueryFilterCreate =
+ CreateFunctionPointer((ContinuousQueryFilterCreateCallbackDelegate) ContinuousQueryFilterCreate),
+ continuousQueryFilterApply =
+ CreateFunctionPointer((ContinuousQueryFilterApplyCallbackDelegate) ContinuousQueryFilterApply),
+ continuousQueryFilterRelease =
+ CreateFunctionPointer((ContinuousQueryFilterReleaseCallbackDelegate) ContinuousQueryFilterRelease),
+ dataStreamerTopologyUpdate =
+ CreateFunctionPointer((DataStreamerTopologyUpdateCallbackDelegate) DataStreamerTopologyUpdate),
+ dataStreamerStreamReceiverInvoke =
+ CreateFunctionPointer((DataStreamerStreamReceiverInvokeCallbackDelegate) DataStreamerStreamReceiverInvoke),
+
+ futureByteResult = CreateFunctionPointer((FutureByteResultCallbackDelegate) FutureByteResult),
+ futureBoolResult = CreateFunctionPointer((FutureBoolResultCallbackDelegate) FutureBoolResult),
+ futureShortResult = CreateFunctionPointer((FutureShortResultCallbackDelegate) FutureShortResult),
+ futureCharResult = CreateFunctionPointer((FutureCharResultCallbackDelegate) FutureCharResult),
+ futureIntResult = CreateFunctionPointer((FutureIntResultCallbackDelegate) FutureIntResult),
+ futureFloatResult = CreateFunctionPointer((FutureFloatResultCallbackDelegate) FutureFloatResult),
+ futureLongResult = CreateFunctionPointer((FutureLongResultCallbackDelegate) FutureLongResult),
+ futureDoubleResult = CreateFunctionPointer((FutureDoubleResultCallbackDelegate) FutureDoubleResult),
+ futureObjectResult = CreateFunctionPointer((FutureObjectResultCallbackDelegate) FutureObjectResult),
+ futureNullResult = CreateFunctionPointer((FutureNullResultCallbackDelegate) FutureNullResult),
+ futureError = CreateFunctionPointer((FutureErrorCallbackDelegate) FutureError),
+ lifecycleOnEvent = CreateFunctionPointer((LifecycleOnEventCallbackDelegate) LifecycleOnEvent),
+ memoryReallocate = CreateFunctionPointer((MemoryReallocateCallbackDelegate) MemoryReallocate),
+ nodeInfo = CreateFunctionPointer((NodeInfoCallbackDelegate) NodeInfo),
+
+ messagingFilterCreate = CreateFunctionPointer((MessagingFilterCreateCallbackDelegate)MessagingFilterCreate),
+ messagingFilterApply = CreateFunctionPointer((MessagingFilterApplyCallbackDelegate)MessagingFilterApply),
+ messagingFilterDestroy = CreateFunctionPointer((MessagingFilterDestroyCallbackDelegate)MessagingFilterDestroy),
+
+ eventFilterCreate = CreateFunctionPointer((EventFilterCreateCallbackDelegate)EventFilterCreate),
+ eventFilterApply = CreateFunctionPointer((EventFilterApplyCallbackDelegate)EventFilterApply),
+ eventFilterDestroy = CreateFunctionPointer((EventFilterDestroyCallbackDelegate)EventFilterDestroy),
+
+ serviceInit = CreateFunctionPointer((ServiceInitCallbackDelegate)ServiceInit),
+ serviceExecute = CreateFunctionPointer((ServiceExecuteCallbackDelegate)ServiceExecute),
+ serviceCancel = CreateFunctionPointer((ServiceCancelCallbackDelegate)ServiceCancel),
+ serviceInvokeMethod = CreateFunctionPointer((ServiceInvokeMethodCallbackDelegate)ServiceInvokeMethod),
+
+ clusterNodeFilterApply = CreateFunctionPointer((СlusterNodeFilterApplyCallbackDelegate)СlusterNodeFilterApply),
+
+ onStart = CreateFunctionPointer((OnStartCallbackDelegate)OnStart),
+ onStop = CreateFunctionPointer((OnStopCallbackDelegate)OnStop),
+ error = CreateFunctionPointer((ErrorCallbackDelegate)Error),
+
+ extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong),
+ extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong)
+ };
+
+ _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
+
+ Marshal.StructureToPtr(cbs, _cbsPtr, false);
+
+ _thisHnd = GCHandle.Alloc(this);
+ }
+
+ /// <summary>
+ /// Gets the handle registry.
+ /// </summary>
+ public HandleRegistry HandleRegistry
+ {
+ get { return _handleRegistry; }
+ }
+
+ #region IMPLEMENTATION: CACHE
+
+ private long CacheStoreCreate(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var cacheStore = CacheStore.CreateInstance(memPtr, _handleRegistry);
+
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ {
+ lock (_initActions)
+ {
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ _initActions.Add(g => cacheStore.Init(g));
+ }
+ }
+
+ return cacheStore.Handle;
+ }, true);
+ }
+
+ private int CacheStoreInvoke(void* target, long objPtr, long memPtr, void* cb)
+ {
+ return SafeCall(() =>
+ {
+ var t = _handleRegistry.Get<CacheStore>(objPtr, true);
+
+ IUnmanagedTarget cb0 = null;
+
+ if ((long) cb != 0)
+ cb0 = new UnmanagedNonReleaseableTarget(_ctx.NativeContext, cb);
+
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return t.Invoke(stream, cb0, _ignite);
+ }
+ });
+ }
+
+ private void CacheStoreDestroy(void* target, long objPtr)
+ {
+ SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ }
+
+ private long CacheStoreSessionCreate(void* target, long storePtr)
+ {
+ return SafeCall(() => _ignite.HandleRegistry.Allocate(new CacheStoreSession()));
+ }
+
+ private long CacheEntryFilterCreate(void* target, long memPtr)
+ {
+ return SafeCall(() => CacheEntryFilterHolder.CreateInstance(memPtr, _ignite).Handle);
+ }
+
+ private int CacheEntryFilterApply(void* target, long objPtr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(objPtr);
+
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return t.Invoke(stream);
+ }
+ });
+ }
+
+ private void CacheEntryFilterDestroy(void* target, long objPtr)
+ {
+ SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ }
+
+ private void CacheInvoke(void* target, long inMemPtr, long outMemPtr)
+ {
+ SafeCall(() =>
+ {
+ using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).Stream())
+ {
+ var result = ReadAndRunCacheEntryProcessor(inStream, _ignite);
+
+ using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).Stream())
+ {
+ result.Write(outStream, _ignite.Marshaller);
+
+ outStream.SynchronizeOutput();
+ }
+ }
+ });
+ }
+
+ /// <summary>
+ /// Reads cache entry processor and related data from stream, executes it and returns the result.
+ /// </summary>
+ /// <param name="inOutStream">Stream.</param>
+ /// <param name="grid">Grid.</param>
+ /// <returns>CacheEntryProcessor result.</returns>
+ private CacheEntryProcessorResultHolder ReadAndRunCacheEntryProcessor(IPortableStream inOutStream,
+ Ignite grid)
+ {
+ var marsh = grid.Marshaller;
+
+ var key = marsh.Unmarshal<object>(inOutStream);
+ var val = marsh.Unmarshal<object>(inOutStream);
+ var isLocal = inOutStream.ReadBool();
+
+ var holder = isLocal
+ ? _handleRegistry.Get<CacheEntryProcessorHolder>(inOutStream.ReadLong(), true)
+ : marsh.Unmarshal<CacheEntryProcessorHolder>(inOutStream);
+
+ return holder.Process(key, val, val != null, grid);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: COMPUTE
+
+ private void ComputeTaskMap(void* target, long taskPtr, long inMemPtr, long outMemPtr)
+ {
+ SafeCall(() =>
+ {
+ using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).Stream())
+ {
+ using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).Stream())
+ {
+ Task(taskPtr).Map(inStream, outStream);
+ }
+ }
+ });
+ }
+
+ private int ComputeTaskJobResult(void* target, long taskPtr, long jobPtr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var task = Task(taskPtr);
+
+ if (memPtr == 0)
+ {
+ return task.JobResultLocal(Job(jobPtr));
+ }
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return task.JobResultRemote(Job(jobPtr), stream);
+ }
+ });
+ }
+
+ private void ComputeTaskReduce(void* target, long taskPtr)
+ {
+ SafeCall(() =>
+ {
+ var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
+
+ task.Reduce();
+ });
+ }
+
+ private void ComputeTaskComplete(void* target, long taskPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
+
+ if (memPtr == 0)
+ task.Complete(taskPtr);
+ else
+ {
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ task.CompleteWithError(taskPtr, stream);
+ }
+ }
+ });
+ }
+
+ private int ComputeJobSerialize(void* target, long jobPtr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return Job(jobPtr).Serialize(stream) ? 1 : 0;
+ }
+ });
+ }
+
+ private long ComputeJobCreate(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite, stream);
+
+ return _handleRegistry.Allocate(job);
+ }
+ });
+ }
+
+ private void ComputeJobExecute(void* target, long jobPtr, int cancel, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ var job = Job(jobPtr);
+
+ if (memPtr == 0)
+ job.ExecuteLocal(cancel == 1);
+ else
+ {
+ using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ job.ExecuteRemote(stream, cancel == 1);
+ }
+ }
+ });
+ }
+
+ private void ComputeJobCancel(void* target, long jobPtr)
+ {
+ SafeCall(() =>
+ {
+ Job(jobPtr).Cancel();
+ });
+ }
+
+ private void ComputeJobDestroy(void* target, long jobPtr)
+ {
+ SafeCall(() =>
+ {
+ _handleRegistry.Release(jobPtr);
+ });
+ }
+
+ /// <summary>
+ /// Get compute task using it's GC handle pointer.
+ /// </summary>
+ /// <param name="taskPtr">Task pointer.</param>
+ /// <returns>Compute task.</returns>
+ private IComputeTaskHolder Task(long taskPtr)
+ {
+ return _handleRegistry.Get<IComputeTaskHolder>(taskPtr);
+ }
+
+ /// <summary>
+ /// Get comptue job using it's GC handle pointer.
+ /// </summary>
+ /// <param name="jobPtr">Job pointer.</param>
+ /// <returns>Compute job.</returns>
+ private ComputeJobHolder Job(long jobPtr)
+ {
+ return _handleRegistry.Get<ComputeJobHolder>(jobPtr);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: CONTINUOUS QUERY
+
+ private void ContinuousQueryListenerApply(void* target, long lsnrPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ var hnd = _handleRegistry.Get<IContinuousQueryHandleImpl>(lsnrPtr);
+
+ hnd.Apply(IgniteManager.Memory.Get(memPtr).Stream());
+ });
+ }
+
+ [SuppressMessage("ReSharper", "PossibleNullReferenceException")]
+ private long ContinuousQueryFilterCreate(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ // 1. Unmarshal filter holder.
+ IPortableStream stream = IgniteManager.Memory.Get(memPtr).Stream();
+
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ ContinuousQueryFilterHolder filterHolder = reader.ReadObject<ContinuousQueryFilterHolder>();
+
+ // 2. Create real filter from it's holder.
+ Type filterWrapperTyp = typeof(ContinuousQueryFilter<,>)
+ .MakeGenericType(filterHolder.KeyType, filterHolder.ValueType);
+
+ Type filterTyp = typeof(ICacheEntryEventFilter<,>)
+ .MakeGenericType(filterHolder.KeyType, filterHolder.ValueType);
+
+ var filter = (IContinuousQueryFilter)filterWrapperTyp
+ .GetConstructor(new[] { filterTyp, typeof(bool) })
+ .Invoke(new[] { filterHolder.Filter, filterHolder.KeepPortable });
+
+ // 3. Inject grid.
+ filter.Inject(_ignite);
+
+ // 4. Allocate GC handle.
+ return filter.Allocate();
+ });
+ }
+
+ private int ContinuousQueryFilterApply(void* target, long filterPtr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+
+ return holder.Evaluate(IgniteManager.Memory.Get(memPtr).Stream()) ? 1 : 0;
+ });
+ }
+
+ private void ContinuousQueryFilterRelease(void* target, long filterPtr)
+ {
+ SafeCall(() =>
+ {
+ var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+
+ holder.Release();
+ });
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: DATA STREAMER
+
+ private void DataStreamerTopologyUpdate(void* target, long ldrPtr, long topVer, int topSize)
+ {
+ SafeCall(() =>
+ {
+ var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr);
+
+ if (ldrRef == null)
+ return;
+
+ var ldr = ldrRef.Target as IDataStreamer;
+
+ if (ldr != null)
+ ldr.TopologyChange(topVer, topSize);
+ else
+ _handleRegistry.Release(ldrPtr, true);
+ });
+ }
+
+ private void DataStreamerStreamReceiverInvoke(void* target, long rcvPtr, void* cache, long memPtr,
+ byte keepPortable)
+ {
+ SafeCall(() =>
+ {
+ var stream = IgniteManager.Memory.Get(memPtr).Stream();
+
+ var reader = _ignite.Marshaller.StartUnmarshal(stream, PortableMode.ForcePortable);
+
+ var portableReceiver = reader.ReadObject<PortableUserObject>();
+
+ var receiver = _handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ??
+ portableReceiver.Deserialize<StreamReceiverHolder>();
+
+ if (receiver != null)
+ receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx.NativeContext, cache), stream,
+ keepPortable != 0);
+ });
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: FUTURES
+
+ private void FutureByteResult(void* target, long futPtr, int res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte)res); });
+ });
+ }
+
+ private void FutureBoolResult(void* target, long futPtr, int res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1); });
+ });
+ }
+
+ private void FutureShortResult(void* target, long futPtr, int res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<short>(futPtr, fut => { fut.OnResult((short)res); });
+ });
+ }
+
+ private void FutureCharResult(void* target, long futPtr, int res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res); });
+ });
+ }
+
+ private void FutureIntResult(void* target, long futPtr, int res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<int>(futPtr, fut => { fut.OnResult(res); });
+ });
+ }
+
+ private void FutureFloatResult(void* target, long futPtr, float res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<float>(futPtr, fut => { fut.OnResult(res); });
+ });
+ }
+
+ private void FutureLongResult(void* target, long futPtr, long res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); });
+ });
+ }
+
+ private void FutureDoubleResult(void* target, long futPtr, double res)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture<double>(futPtr, fut => { fut.OnResult(res); });
+ });
+ }
+
+ private void FutureObjectResult(void* target, long futPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture(futPtr, fut =>
+ {
+ IPortableStream stream = IgniteManager.Memory.Get(memPtr).Stream();
+
+ fut.OnResult(stream);
+ });
+ });
+ }
+
+ private void FutureNullResult(void* target, long futPtr)
+ {
+ SafeCall(() =>
+ {
+ ProcessFuture(futPtr, fut => { fut.OnNullResult(); });
+ });
+ }
+
+ private void FutureError(void* target, long futPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ IPortableStream stream = IgniteManager.Memory.Get(memPtr).Stream();
+
+ PortableReaderImpl reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ string errCls = reader.ReadString();
+ string errMsg = reader.ReadString();
+
+ Exception err = ExceptionUtils.GetException(errCls, errMsg, reader);
+
+ ProcessFuture(futPtr, fut => { fut.OnError(err); });
+ });
+ }
+
+ /// <summary>
+ /// Process future.
+ /// </summary>
+ /// <param name="futPtr">Future pointer.</param>
+ /// <param name="action">Action.</param>
+ private void ProcessFuture(long futPtr, Action<IFutureInternal> action)
+ {
+ try
+ {
+ action(_handleRegistry.Get<IFutureInternal>(futPtr, true));
+ }
+ finally
+ {
+ _handleRegistry.Release(futPtr);
+ }
+ }
+
+ /// <summary>
+ /// Process future.
+ /// </summary>
+ /// <param name="futPtr">Future pointer.</param>
+ /// <param name="action">Action.</param>
+ private void ProcessFuture<T>(long futPtr, Action<Future<T>> action)
+ {
+ try
+ {
+ action(_handleRegistry.Get<Future<T>>(futPtr, true));
+ }
+ finally
+ {
+ _handleRegistry.Release(futPtr);
+ }
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: LIFECYCLE
+
+ private void LifecycleOnEvent(void* target, long ptr, int evt)
+ {
+ SafeCall(() =>
+ {
+ var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr);
+
+ bean.OnLifecycleEvent((LifecycleEventType)evt);
+ }, true);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: MESSAGING
+
+ private long MessagingFilterCreate(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ MessageFilterHolder holder = MessageFilterHolder.CreateRemote(_ignite, memPtr);
+
+ return _ignite.HandleRegistry.AllocateSafe(holder);
+ });
+ }
+
+ private int MessagingFilterApply(void* target, long ptr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var holder = _ignite.HandleRegistry.Get<MessageFilterHolder>(ptr, false);
+
+ if (holder == null)
+ return 0;
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return holder.Invoke(stream);
+ }
+ });
+ }
+
+ private void MessagingFilterDestroy(void* target, long ptr)
+ {
+ SafeCall(() =>
+ {
+ _ignite.HandleRegistry.Release(ptr);
+ });
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: EXTENSIONS
+
+ private long ExtensionCallbackInLongOutLong(void* target, int op, long arg1)
+ {
+ throw new InvalidOperationException("Unsupported operation type: " + op);
+ }
+
+ private long ExtensionCallbackInLongLongOutLong(void* target, int op, long arg1, long arg2)
+ {
+ return SafeCall(() =>
+ {
+ switch (op)
+ {
+ case OpPrepareDotNet:
+ var inMem = IgniteManager.Memory.Get(arg1);
+ var outMem = IgniteManager.Memory.Get(arg2);
+
+ PlatformMemoryStream inStream = inMem.Stream();
+ PlatformMemoryStream outStream = outMem.Stream();
+
+ Ignition.OnPrepare(inStream, outStream, _handleRegistry);
+
+ return 0;
+
+ default:
+ throw new InvalidOperationException("Unsupported operation type: " + op);
+ }
+ }, op == OpPrepareDotNet);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: EVENTS
+
+ private long EventFilterCreate(void* target, long memPtr)
+ {
+ return SafeCall(() => _handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr, _ignite)));
+ }
+
+ private int EventFilterApply(void* target, long ptr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr, false);
+
+ if (holder == null)
+ return 0;
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ return holder.Invoke(stream);
+ }
+ });
+ }
+
+ private void EventFilterDestroy(void* target, long ptr)
+ {
+ SafeCall(() =>
+ {
+ _ignite.HandleRegistry.Release(ptr);
+ });
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: SERVICES
+
+ private long ServiceInit(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ bool srvKeepPortable = reader.ReadBoolean();
+ var svc = reader.ReadObject<IService>();
+
+ ResourceProcessor.Inject(svc, _ignite);
+
+ svc.Init(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepPortable)));
+
+ return _handleRegistry.Allocate(svc);
+ }
+ });
+ }
+
+ private void ServiceExecute(void* target, long svcPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ var svc = _handleRegistry.Get<IService>(svcPtr, true);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ bool srvKeepPortable = reader.ReadBoolean();
+
+ svc.Execute(new ServiceContext(
+ _ignite.Marshaller.StartUnmarshal(stream, srvKeepPortable)));
+ }
+ });
+ }
+
+ private void ServiceCancel(void* target, long svcPtr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ var svc = _handleRegistry.Get<IService>(svcPtr, true);
+
+ try
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ bool srvKeepPortable = reader.ReadBoolean();
+
+ svc.Cancel(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepPortable)));
+ }
+ }
+ finally
+ {
+ _ignite.HandleRegistry.Release(svcPtr);
+ }
+ });
+ }
+
+ private void ServiceInvokeMethod(void* target, long svcPtr, long inMemPtr, long outMemPtr)
+ {
+ SafeCall(() =>
+ {
+ using (var inStream = IgniteManager.Memory.Get(inMemPtr).Stream())
+ using (var outStream = IgniteManager.Memory.Get(outMemPtr).Stream())
+ {
+ var svc = _handleRegistry.Get<IService>(svcPtr, true);
+
+ string mthdName;
+ object[] mthdArgs;
+
+ ServiceProxySerializer.ReadProxyMethod(inStream, _ignite.Marshaller, out mthdName, out mthdArgs);
+
+ var result = ServiceProxyInvoker.InvokeServiceMethod(svc, mthdName, mthdArgs);
+
+ ServiceProxySerializer.WriteInvocationResult(outStream, _ignite.Marshaller, result.Key, result.Value);
+
+ outStream.SynchronizeOutput();
+ }
+ });
+ }
+
+ private int СlusterNodeFilterApply(void* target, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ var filter = (IClusterNodeFilter) reader.ReadObject<PortableOrSerializableObjectHolder>().Item;
+
+ return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 : 0;
+ }
+ });
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: MISCELLANEOUS
+
+ private void NodeInfo(void* target, long memPtr)
+ {
+ SafeCall(() => _ignite.UpdateNodeInfo(memPtr));
+ }
+
+ private void MemoryReallocate(void* target, long memPtr, int cap)
+ {
+ SafeCall(() =>
+ {
+ IgniteManager.Memory.Get(memPtr).Reallocate(cap);
+ }, true);
+ }
+
+ private void OnStart(void* target, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ Ignition.OnStart(IgniteManager.Memory.Get(memPtr).Stream());
+ }, true);
+ }
+
+ private void OnStop(void* target)
+ {
+ Marshal.FreeHGlobal(_cbsPtr);
+
+ // ReSharper disable once ImpureMethodCallOnReadonlyValueField
+ _thisHnd.Free();
+
+ // Allow context to be collected, which will cause resource cleanup in finalizer.
+ _ctx = null;
+ }
+
+ private void Error(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars,
+ int errMsgCharsLen, void* errData, int errDataLen)
+ {
+ string errCls = IgniteUtils.Utf8UnmanagedToString(errClsChars, errClsCharsLen);
+ string errMsg = IgniteUtils.Utf8UnmanagedToString(errMsgChars, errMsgCharsLen);
+
+ switch (errType)
+ {
+ case ErrGeneric:
+ if (_ignite != null && errDataLen > 0)
+ throw ExceptionUtils.GetException(errCls, errMsg,
+ _ignite.Marshaller.StartUnmarshal(new PlatformRawMemory(errData, errDataLen).Stream()));
+
+ throw ExceptionUtils.GetException(errCls, errMsg);
+
+ case ErrJvmInit:
+ throw ExceptionUtils.GetJvmInitializeException(errCls, errMsg);
+
+ case ErrJvmAttach:
+ throw new IgniteException("Failed to attach to JVM.");
+
+ default:
+ throw new IgniteException("Unknown exception [cls=" + errCls + ", msg=" + errMsg + ']');
+ }
+ }
+
+ #endregion
+
+ #region HELPERS
+
+ private void SafeCall(Action func, bool allowUnitialized = false)
+ {
+ if (!allowUnitialized)
+ _initEvent.Wait();
+
+ try
+ {
+ func();
+ }
+ catch (Exception e)
+ {
+ UU.ThrowToJava(_ctx.NativeContext, e);
+ }
+ }
+
+ private T SafeCall<T>(Func<T> func, bool allowUnitialized = false)
+ {
+ if (!allowUnitialized)
+ _initEvent.Wait();
+
+ try
+ {
+ return func();
+ }
+ catch (Exception e)
+ {
+ UU.ThrowToJava(_ctx.NativeContext, e);
+
+ return default(T);
+ }
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Callbacks pointer.
+ /// </summary>
+ public void* CallbacksPointer
+ {
+ get { return _cbsPtr.ToPointer(); }
+ }
+
+ /// <summary>
+ /// Gets the context.
+ /// </summary>
+ public UnmanagedContext Context
+ {
+ get { return _ctx; }
+ }
+
+ /// <summary>
+ /// Create function pointer for the given function.
+ /// </summary>
+ private void* CreateFunctionPointer(Delegate del)
+ {
+ _delegates.Add(del); // Prevent delegate from being GC-ed.
+
+ return Marshal.GetFunctionPointerForDelegate(del).ToPointer();
+ }
+
+ /// <param name="context">Context.</param>
+ public void SetContext(void* context)
+ {
+ Debug.Assert(context != null);
+ Debug.Assert(_ctx == null);
+
+ _ctx = new UnmanagedContext(context);
+ }
+
+ /// <summary>
+ /// Initializes this instance with grid.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ public void Initialize(Ignite grid)
+ {
+ Debug.Assert(grid != null);
+
+ _ignite = grid;
+
+ lock (_initActions)
+ {
+ _initActions.ForEach(x => x(grid));
+
+ _initActions.Clear();
+ }
+
+ _initEvent.Set();
+ }
+
+ /// <summary>
+ /// Cleanups this instance.
+ /// </summary>
+ public void Cleanup()
+ {
+ _ignite = null;
+
+ _handleRegistry.Close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedContext.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedContext.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedContext.cs
new file mode 100644
index 0000000..89d2071
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedContext.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Unmanaged
+{
+ /// <summary>
+ /// Unmanaged context.
+ /// Wrapper around native ctx pointer to track finalization.
+ /// </summary>
+ internal unsafe class UnmanagedContext
+ {
+ /** Context */
+ private readonly void* _nativeCtx;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public UnmanagedContext(void* ctx)
+ {
+ _nativeCtx = ctx;
+ }
+
+ /// <summary>
+ /// Gets the native context pointer.
+ /// </summary>
+ public void* NativeContext
+ {
+ get { return _nativeCtx; }
+ }
+
+ /// <summary>
+ /// Destructor.
+ /// </summary>
+ ~UnmanagedContext()
+ {
+ UnmanagedUtils.DeleteContext(_nativeCtx); // Release CPP object.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedNonReleaseableTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedNonReleaseableTarget.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedNonReleaseableTarget.cs
new file mode 100644
index 0000000..24db5a5
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedNonReleaseableTarget.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Unmanaged
+{
+ using System;
+
+ /// <summary>
+ /// Unmanaged target which does not require explicit release.
+ /// </summary>
+ internal unsafe class UnmanagedNonReleaseableTarget : IUnmanagedTarget
+ {
+ /** Context. */
+ private readonly void* _ctx;
+
+ /** Target. */
+ private readonly void* _target;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="ctx">Context.</param>
+ /// <param name="target">Target.</param>
+ public UnmanagedNonReleaseableTarget(void* ctx, void* target)
+ {
+ _ctx = ctx;
+ _target = target;
+ }
+
+ /** <inheritdoc /> */
+ public void* Context
+ {
+ get { return _ctx; }
+ }
+
+ /** <inheritdoc /> */
+ public void* Target
+ {
+ get { return _target; }
+ }
+
+ /** <inheritdoc /> */
+ public IUnmanagedTarget ChangeTarget(void* target)
+ {
+ throw new NotSupportedException();
+ }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedTarget.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedTarget.cs
new file mode 100644
index 0000000..e54a199
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedTarget.cs
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Unmanaged
+{
+ using System;
+ using System.Runtime.InteropServices;
+ using UU = UnmanagedUtils;
+
+ /// <summary>
+ /// Base unmanaged target implementation.
+ /// </summary>
+ internal unsafe sealed class UnmanagedTarget : CriticalHandle, IUnmanagedTarget
+ {
+ /** Context. */
+ private readonly UnmanagedContext _ctx;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="ctx">Context.</param>
+ /// <param name="target">Target.</param>
+ public UnmanagedTarget(UnmanagedContext ctx, void* target)
+ : base(IntPtr.Zero)
+ {
+ _ctx = ctx;
+
+ SetHandle(new IntPtr(target));
+ }
+
+ /** <inheritdoc /> */
+ public void* Context
+ {
+ get { return _ctx.NativeContext; }
+ }
+
+ /** <inheritdoc /> */
+ public void* Target
+ {
+ get { return handle.ToPointer(); }
+ }
+
+ /** <inheritdoc /> */
+ public IUnmanagedTarget ChangeTarget(void* target)
+ {
+ return new UnmanagedTarget(_ctx, target);
+ }
+
+ /** <inheritdoc /> */
+ protected override bool ReleaseHandle()
+ {
+ UU.Release(this);
+
+ return true;
+ }
+
+ /** <inheritdoc /> */
+ public override bool IsInvalid
+ {
+ get { return handle == IntPtr.Zero; }
+ }
+ }
+}