You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/21 16:27:36 UTC
[40/52] [partial] ignite git commit: IGNITE-1513: Moved .Net.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
new file mode 100644
index 0000000..bf11397
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Threading;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Datastream;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Data streamer internal interface to get rid of generics.
+ /// </summary>
+ internal interface IDataStreamer
+ {
+ /// <summary>
+ /// Callback invoked on topology size change.
+ /// </summary>
+ /// <param name="topVer">New topology version.</param>
+ /// <param name="topSize">New topology size.</param>
+ void TopologyChange(long topVer, int topSize);
+ }
+
+ /// <summary>
+ /// Data streamer implementation.
+ /// </summary>
+ internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV>
+ {
+
+#pragma warning disable 0420
+
+ /** Policy: continue. */
+ internal const int PlcContinue = 0;
+
+ /** Policy: close. */
+ internal const int PlcClose = 1;
+
+ /** Policy: cancel and close. */
+ internal const int PlcCancelClose = 2;
+
+ /** Policy: flush. */
+ internal const int PlcFlush = 3;
+
+ /** Operation: update. */
+ private const int OpUpdate = 1;
+
+ /** Operation: set receiver. */
+ private const int OpReceiver = 2;
+
+ /** Cache name. */
+ private readonly string _cacheName;
+
+ /** Lock. */
+ private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
+
+ /** Closed event. */
+ private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false);
+
+ /** Close future. */
+ private readonly Future<object> _closeFut = new Future<object>();
+
+ /** GC handle to this streamer. */
+ private readonly long _hnd;
+
+ /** Topology version. */
+ private long _topVer;
+
+ /** Topology size. */
+ private int _topSize;
+
+ /** Buffer send size. */
+ private volatile int _bufSndSize;
+
+ /** Current data streamer batch. */
+ private volatile DataStreamerBatch<TK, TV> _batch;
+
+ /** Flusher. */
+ private readonly Flusher<TK, TV> _flusher;
+
+ /** Receiver. */
+ private volatile IStreamReceiver<TK, TV> _rcv;
+
+ /** Receiver handle. */
+ private long _rcvHnd;
+
+ /** Receiver portable mode. */
+ private readonly bool _keepPortable;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="cacheName">Cache name.</param>
+ /// <param name="keepPortable">Portable flag.</param>
+ public DataStreamerImpl(IUnmanagedTarget target, PortableMarshaller marsh, string cacheName, bool keepPortable)
+ : base(target, marsh)
+ {
+ _cacheName = cacheName;
+ _keepPortable = keepPortable;
+
+ // Create empty batch.
+ _batch = new DataStreamerBatch<TK, TV>();
+
+ // Allocate GC handle so that this data streamer could be easily dereferenced from native code.
+ WeakReference thisRef = new WeakReference(this);
+
+ _hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef);
+
+ // Start topology listening. This call will ensure that buffer size member is updated.
+ UU.DataStreamerListenTopology(target, _hnd);
+
+ // Membar to ensure fields initialization before leaving constructor.
+ Thread.MemoryBarrier();
+
+ // Start flusher after everything else is initialized.
+ _flusher = new Flusher<TK, TV>(thisRef);
+
+ _flusher.RunThread();
+ }
+
+ /** <inheritDoc /> */
+ public string CacheName
+ {
+ get { return _cacheName; }
+ }
+
+ /** <inheritDoc /> */
+ public bool AllowOverwrite
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return UU.DataStreamerAllowOverwriteGet(Target);
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ UU.DataStreamerAllowOverwriteSet(Target, value);
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool SkipStore
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return UU.DataStreamerSkipStoreGet(Target);
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ UU.DataStreamerSkipStoreSet(Target, value);
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public int PerNodeBufferSize
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return UU.DataStreamerPerNodeBufferSizeGet(Target);
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ UU.DataStreamerPerNodeBufferSizeSet(Target, value);
+
+ _bufSndSize = _topSize * value;
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public int PerNodeParallelOperations
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return UU.DataStreamerPerNodeParallelOperationsGet(Target);
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ UU.DataStreamerPerNodeParallelOperationsSet(Target, value);
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ }
+ }
+
+ /** <inheritDoc /> */
+ public long AutoFlushFrequency
+ {
+ get
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ return _flusher.Frequency;
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+
+ }
+ set
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ _flusher.Frequency = value;
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IFuture Future
+ {
+ get
+ {
+ ThrowIfDisposed();
+
+ return _closeFut;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IStreamReceiver<TK, TV> Receiver
+ {
+ get
+ {
+ ThrowIfDisposed();
+
+ return _rcv;
+ }
+ set
+ {
+ IgniteArgumentCheck.NotNull(value, "value");
+
+ var handleRegistry = Marshaller.Ignite.HandleRegistry;
+
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ if (_rcv == value)
+ return;
+
+ var rcvHolder = new StreamReceiverHolder(value,
+ (rec, grid, cache, stream, keepPortable) =>
+ StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
+ keepPortable));
+
+ var rcvHnd0 = handleRegistry.Allocate(rcvHolder);
+
+ try
+ {
+ DoOutOp(OpReceiver, w =>
+ {
+ w.WriteLong(rcvHnd0);
+
+ w.WriteObject(rcvHolder);
+ });
+ }
+ catch (Exception)
+ {
+ handleRegistry.Release(rcvHnd0);
+ throw;
+ }
+
+ if (_rcv != null)
+ handleRegistry.Release(_rcvHnd);
+
+ _rcv = value;
+ _rcvHnd = rcvHnd0;
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IFuture AddData(TK key, TV val)
+ {
+ ThrowIfDisposed();
+
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
+ }
+
+ /** <inheritDoc /> */
+ public IFuture AddData(KeyValuePair<TK, TV> pair)
+ {
+ ThrowIfDisposed();
+
+ return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
+ }
+
+ /** <inheritDoc /> */
+ public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries)
+ {
+ ThrowIfDisposed();
+
+ IgniteArgumentCheck.NotNull(entries, "entries");
+
+ return Add0(entries, entries.Count);
+ }
+
+ /** <inheritDoc /> */
+ public IFuture RemoveData(TK key)
+ {
+ ThrowIfDisposed();
+
+ IgniteArgumentCheck.NotNull(key, "key");
+
+ return Add0(new DataStreamerRemoveEntry<TK>(key), 1);
+ }
+
+ /** <inheritDoc /> */
+ public void TryFlush()
+ {
+ ThrowIfDisposed();
+
+ DataStreamerBatch<TK, TV> batch0 = _batch;
+
+ if (batch0 != null)
+ Flush0(batch0, false, PlcFlush);
+ }
+
+ /** <inheritDoc /> */
+ public void Flush()
+ {
+ ThrowIfDisposed();
+
+ DataStreamerBatch<TK, TV> batch0 = _batch;
+
+ if (batch0 != null)
+ Flush0(batch0, true, PlcFlush);
+ else
+ {
+ // Batch is null, i.e. data streamer is closing. Wait for close to complete.
+ _closedEvt.Wait();
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void Close(bool cancel)
+ {
+ _flusher.Stop();
+
+ while (true)
+ {
+ DataStreamerBatch<TK, TV> batch0 = _batch;
+
+ if (batch0 == null)
+ {
+ // Wait for concurrent close to finish.
+ _closedEvt.Wait();
+
+ return;
+ }
+
+ if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
+ {
+ _closeFut.OnDone(null, null);
+
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ base.Dispose(true);
+
+ if (_rcv != null)
+ Marshaller.Ignite.HandleRegistry.Release(_rcvHnd);
+
+ _closedEvt.Set();
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ Marshaller.Ignite.HandleRegistry.Release(_hnd);
+
+ break;
+ }
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IDataStreamer<TK1, TV1> WithKeepPortable<TK1, TV1>()
+ {
+ if (_keepPortable)
+ {
+ var result = this as IDataStreamer<TK1, TV1>;
+
+ if (result == null)
+ throw new InvalidOperationException(
+ "Can't change type of portable streamer. WithKeepPortable has been called on an instance of " +
+ "portable streamer with incompatible generic arguments.");
+
+ return result;
+ }
+
+ return new DataStreamerImpl<TK1, TV1>(UU.ProcessorDataStreamer(Marshaller.Ignite.InteropProcessor,
+ _cacheName, true), Marshaller, _cacheName, true);
+ }
+
+ /** <inheritDoc /> */
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ Close(false); // Normal dispose: do not cancel
+ else
+ {
+ // Finalizer: just close Java streamer
+ try
+ {
+ if (_batch != null)
+ _batch.Send(this, PlcCancelClose);
+ }
+ catch (Exception)
+ {
+ // Finalizers should never throw
+ }
+
+ Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
+ Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
+
+ base.Dispose(false);
+ }
+ }
+
+ /** <inheritDoc /> */
+ ~DataStreamerImpl()
+ {
+ Dispose(false);
+ }
+
+ /** <inheritDoc /> */
+ public void TopologyChange(long topVer, int topSize)
+ {
+ _rwLock.EnterWriteLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ if (_topVer < topVer)
+ {
+ _topVer = topVer;
+ _topSize = topSize;
+
+ _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target);
+ }
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ }
+
+ /// <summary>
+ /// Internal add/remove routine.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="cnt">Items count.</param>
+ /// <returns>Future.</returns>
+ private IFuture Add0(object val, int cnt)
+ {
+ int bufSndSize0 = _bufSndSize;
+
+ while (true)
+ {
+ var batch0 = _batch;
+
+ if (batch0 == null)
+ throw new InvalidOperationException("Data streamer is stopped.");
+
+ int size = batch0.Add(val, cnt);
+
+ if (size == -1)
+ {
+ // Batch is blocked, perform CAS.
+ Interlocked.CompareExchange(ref _batch,
+ new DataStreamerBatch<TK, TV>(batch0), batch0);
+
+ continue;
+ }
+ if (size >= bufSndSize0)
+ // Batch is too big, schedule flush.
+ Flush0(batch0, false, PlcContinue);
+
+ return batch0.Future;
+ }
+ }
+
+ /// <summary>
+ /// Internal flush routine.
+ /// </summary>
+ /// <param name="curBatch"></param>
+ /// <param name="wait">Whether to wait for flush to complete.</param>
+ /// <param name="plc">Whether this is the last batch.</param>
+ /// <returns>Whether this call was able to CAS previous batch</returns>
+ private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
+ {
+ // 1. Try setting new current batch to help further adders.
+ bool res = Interlocked.CompareExchange(ref _batch,
+ (plc == PlcContinue || plc == PlcFlush) ?
+ new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
+
+ // 2. Perform actual send.
+ curBatch.Send(this, plc);
+
+ if (wait)
+ // 3. Wait for all futures to finish.
+ curBatch.AwaitCompletion();
+
+ return res;
+ }
+
+ /// <summary>
+ /// Start write.
+ /// </summary>
+ /// <returns>Writer.</returns>
+ internal void Update(Action<PortableWriterImpl> action)
+ {
+ _rwLock.EnterReadLock();
+
+ try
+ {
+ ThrowIfDisposed();
+
+ DoOutOp(OpUpdate, action);
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+ }
+
+ /// <summary>
+ /// Flusher.
+ /// </summary>
+ private class Flusher<TK1, TV1>
+ {
+ /** State: running. */
+ private const int StateRunning = 0;
+
+ /** State: stopping. */
+ private const int StateStopping = 1;
+
+ /** State: stopped. */
+ private const int StateStopped = 2;
+
+ /** Data streamer. */
+ private readonly WeakReference _ldrRef;
+
+ /** Finish flag. */
+ private int _state;
+
+ /** Flush frequency. */
+ private long _freq;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="ldrRef">Data streamer weak reference..</param>
+ public Flusher(WeakReference ldrRef)
+ {
+ _ldrRef = ldrRef;
+
+ lock (this)
+ {
+ _state = StateRunning;
+ }
+ }
+
+ /// <summary>
+ /// Main flusher routine.
+ /// </summary>
+ private void Run()
+ {
+ bool force = false;
+ long curFreq = 0;
+
+ try
+ {
+ while (true)
+ {
+ if (curFreq > 0 || force)
+ {
+ var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>;
+
+ if (ldr == null)
+ return;
+
+ ldr.TryFlush();
+
+ force = false;
+ }
+
+ lock (this)
+ {
+ // Stop immediately.
+ if (_state == StateStopping)
+ return;
+
+ if (curFreq == _freq)
+ {
+ // Frequency is unchanged
+ if (curFreq == 0)
+ // Just wait for a second and re-try.
+ Monitor.Wait(this, 1000);
+ else
+ {
+ // Calculate remaining time.
+ DateTime now = DateTime.Now;
+
+ long ticks;
+
+ try
+ {
+ ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks;
+
+ if (ticks > int.MaxValue)
+ ticks = int.MaxValue;
+ }
+ catch (ArgumentOutOfRangeException)
+ {
+ // Handle possible overflow.
+ ticks = int.MaxValue;
+ }
+
+ Monitor.Wait(this, TimeSpan.FromTicks(ticks));
+ }
+ }
+ else
+ {
+ if (curFreq != 0)
+ force = true;
+
+ curFreq = _freq;
+ }
+ }
+ }
+ }
+ finally
+ {
+ // Let streamer know about stop.
+ lock (this)
+ {
+ _state = StateStopped;
+
+ Monitor.PulseAll(this);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Frequency.
+ /// </summary>
+ public long Frequency
+ {
+ get
+ {
+ return Interlocked.Read(ref _freq);
+ }
+
+ set
+ {
+ lock (this)
+ {
+ if (_freq != value)
+ {
+ _freq = value;
+
+ Monitor.PulseAll(this);
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Stop flusher.
+ /// </summary>
+ public void Stop()
+ {
+ lock (this)
+ {
+ if (_state == StateRunning)
+ {
+ _state = StateStopping;
+
+ Monitor.PulseAll(this);
+ }
+
+ while (_state != StateStopped)
+ Monitor.Wait(this);
+ }
+ }
+
+ /// <summary>
+ /// Runs the flusher thread.
+ /// </summary>
+ public void RunThread()
+ {
+ new Thread(Run).Start();
+ }
+ }
+
+#pragma warning restore 0420
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
new file mode 100644
index 0000000..7e65934
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+ /// <summary>
+ /// Remove marker.
+ /// </summary>
+ internal class DataStreamerRemoveEntry<TK>
+ {
+ /** Key to remove. */
+ private readonly TK _key;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ public DataStreamerRemoveEntry(TK key)
+ {
+ _key = key;
+ }
+
+ /// <summary>
+ /// Key.
+ /// </summary>
+ public TK Key
+ {
+ get
+ {
+ return _key;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
new file mode 100644
index 0000000..5a7c104
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Datastream
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Datastream;
+ using Apache.Ignite.Core.Impl.Cache;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Portable wrapper for <see cref="IStreamReceiver{TK,TV}"/>.
+ /// </summary>
+ internal class StreamReceiverHolder : IPortableWriteAware
+ {
+ /** */
+ private const byte RcvNormal = 0;
+
+ /** */
+ public const byte RcvTransformer = 1;
+
+ /** Generic receiver. */
+ private readonly object _rcv;
+
+ /** Invoker delegate. */
+ private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _invoke;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public StreamReceiverHolder(PortableReaderImpl reader)
+ {
+ var rcvType = reader.ReadByte();
+
+ _rcv = PortableUtils.ReadPortableOrSerializable<object>(reader);
+
+ Debug.Assert(_rcv != null);
+
+ var type = _rcv.GetType();
+
+ if (rcvType == RcvTransformer)
+ {
+ // rcv is a user ICacheEntryProcessor<K, V, A, R>, construct StreamTransformer from it.
+ // (we can't marshal StreamTransformer directly, because it is generic,
+ // and we do not know type arguments that user will have)
+ _rcv = DelegateTypeDescriptor.GetStreamTransformerCtor(type)(_rcv);
+ }
+
+ _invoke = DelegateTypeDescriptor.GetStreamReceiver(_rcv.GetType());
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
+ /// </summary>
+ /// <param name="rcv">Receiver.</param>
+ /// <param name="invoke">Invoke delegate.</param>
+ public StreamReceiverHolder(object rcv,
+ Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> invoke)
+ {
+ Debug.Assert(rcv != null);
+ Debug.Assert(invoke != null);
+
+ _rcv = rcv;
+ _invoke = invoke;
+ }
+
+ /** <inheritdoc /> */
+ public void WritePortable(IPortableWriter writer)
+ {
+ var w = writer.RawWriter();
+
+ var writeAware = _rcv as IPortableWriteAware;
+
+ if (writeAware != null)
+ writeAware.WritePortable(writer);
+ else
+ {
+ w.WriteByte(RcvNormal);
+ PortableUtils.WritePortableOrSerializable((PortableWriterImpl) writer, _rcv);
+ }
+ }
+
+ /// <summary>
+ /// Updates cache with batch of entries.
+ /// </summary>
+ /// <param name="grid">The grid.</param>
+ /// <param name="cache">Cache.</param>
+ /// <param name="stream">Stream.</param>
+ /// <param name="keepPortable">Portable flag.</param>
+ public void Receive(Ignite grid, IUnmanagedTarget cache, IPortableStream stream, bool keepPortable)
+ {
+ Debug.Assert(grid != null);
+ Debug.Assert(cache != null);
+ Debug.Assert(stream != null);
+
+ _invoke(_rcv, grid, cache, stream, keepPortable);
+ }
+
+ /// <summary>
+ /// Invokes the receiver.
+ /// </summary>
+ /// <param name="receiver">Receiver.</param>
+ /// <param name="grid">Grid.</param>
+ /// <param name="cache">Cache.</param>
+ /// <param name="stream">Stream.</param>
+ /// <param name="keepPortable">Portable flag.</param>
+ public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache,
+ IPortableStream stream, bool keepPortable)
+ {
+ var reader = grid.Marshaller.StartUnmarshal(stream, keepPortable);
+
+ var size = reader.ReadInt();
+
+ var entries = new List<ICacheEntry<TK, TV>>(size);
+
+ for (var i = 0; i < size; i++)
+ entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
+
+ receiver.Receive(grid.Cache<TK, TV>(cache, keepPortable), entries);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
new file mode 100644
index 0000000..3972bb0
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Handle;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Portable;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Ignite events.
+ /// </summary>
+ internal class Events : PlatformTarget, IEvents
+ {
+ /// <summary>
+ /// Opcodes.
+ /// </summary>
+ protected enum Op
+ {
+ RemoteQuery = 1,
+ RemoteListen = 2,
+ StopRemoteListen = 3,
+ WaitForLocal = 4,
+ LocalQuery = 5,
+ RecordLocal = 6,
+ EnableLocal = 8,
+ DisableLocal = 9,
+ GetEnabledEvents = 10
+ }
+
+ /** Map from user func to local wrapper, needed for invoke/unsubscribe. */
+ private readonly Dictionary<object, Dictionary<int, LocalHandledEventFilter>> _localFilters
+ = new Dictionary<object, Dictionary<int, LocalHandledEventFilter>>();
+
+ /** Grid. */
+ protected readonly Ignite Ignite;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Events"/> class.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="clusterGroup">Cluster group.</param>
+ public Events(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
+ : base(target, marsh)
+ {
+ Debug.Assert(clusterGroup != null);
+
+ ClusterGroup = clusterGroup;
+
+ Ignite = (Ignite) clusterGroup.Ignite;
+ }
+
+ /** <inheritDoc /> */
+ public virtual IEvents WithAsync()
+ {
+ return new EventsAsync(UU.EventsWithAsync(Target), Marshaller, ClusterGroup);
+ }
+
+ /** <inheritDoc /> */
+ public virtual bool IsAsync
+ {
+ get { return false; }
+ }
+
+ /** <inheritDoc /> */
+ public virtual IFuture GetFuture()
+ {
+ throw IgniteUtils.GetAsyncModeDisabledException();
+ }
+
+ /** <inheritDoc /> */
+ public virtual IFuture<TResult> GetFuture<TResult>()
+ {
+ throw IgniteUtils.GetAsyncModeDisabledException();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ClusterGroup { get; private set; }
+
+ /** <inheritDoc /> */
+ public virtual List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
+ where T : IEvent
+ {
+ IgniteArgumentCheck.NotNull(filter, "filter");
+
+ return DoOutInOp((int) Op.RemoteQuery,
+ writer =>
+ {
+ writer.Write(new PortableOrSerializableObjectHolder(filter));
+
+ writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds));
+
+ WriteEventTypes(types, writer);
+ },
+ reader => ReadEvents<T>(reader));
+ }
+
+ /** <inheritDoc /> */
+ public virtual Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+ where T : IEvent
+ {
+ IgniteArgumentCheck.Ensure(bufSize > 0, "bufSize", "should be > 0");
+ IgniteArgumentCheck.Ensure(interval == null || interval.Value.TotalMilliseconds > 0, "interval", "should be null or >= 0");
+
+ return DoOutInOp((int) Op.RemoteListen,
+ writer =>
+ {
+ writer.WriteInt(bufSize);
+ writer.WriteLong((long) (interval == null ? 0 : interval.Value.TotalMilliseconds));
+ writer.WriteBoolean(autoUnsubscribe);
+
+ writer.WriteBoolean(localListener != null);
+
+ if (localListener != null)
+ {
+ var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e));
+ writer.WriteLong(Ignite.HandleRegistry.Allocate(listener));
+ }
+
+ writer.WriteBoolean(remoteFilter != null);
+
+ if (remoteFilter != null)
+ writer.Write(new PortableOrSerializableObjectHolder(remoteFilter));
+
+ WriteEventTypes(types, writer);
+ },
+ reader => Marshaller.StartUnmarshal(reader).ReadGuid() ?? Guid.Empty);
+ }
+
+ /** <inheritDoc /> */
+ public virtual void StopRemoteListen(Guid opId)
+ {
+ DoOutOp((int) Op.StopRemoteListen, writer =>
+ {
+ Marshaller.StartMarshal(writer).WriteGuid(opId);
+ });
+ }
+
+ /** <inheritDoc /> */
+ public IEvent WaitForLocal(params int[] types)
+ {
+ return WaitForLocal<IEvent>(null, types);
+ }
+
+ /** <inheritDoc /> */
+ public virtual T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent
+ {
+ long hnd = 0;
+
+ try
+ {
+ return WaitForLocal0(filter, ref hnd, types);
+ }
+ finally
+ {
+ if (filter != null)
+ Ignite.HandleRegistry.Release(hnd);
+ }
+ }
+
+ /** <inheritDoc /> */
+ public List<IEvent> LocalQuery(params int[] types)
+ {
+ return DoOutInOp((int) Op.LocalQuery,
+ writer => WriteEventTypes(types, writer),
+ reader => ReadEvents<IEvent>(reader));
+ }
+
+ /** <inheritDoc /> */
+ public void RecordLocal(IEvent evt)
+ {
+ throw new NotImplementedException("GG-10244");
+ }
+
+ /** <inheritDoc /> */
+ public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+ {
+ IgniteArgumentCheck.NotNull(listener, "listener");
+ IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+ foreach (var type in types)
+ LocalListen(listener, type);
+ }
+
+ /** <inheritDoc /> */
+ public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent
+ {
+ lock (_localFilters)
+ {
+ Dictionary<int, LocalHandledEventFilter> filters;
+
+ if (!_localFilters.TryGetValue(listener, out filters))
+ return false;
+
+ var success = false;
+
+ // Should do this inside lock to avoid race with subscription
+ // ToArray is required because we are going to modify underlying dictionary during enumeration
+ foreach (var filter in GetLocalFilters(listener, types).ToArray())
+ success |= UU.EventsStopLocalListen(Target, filter.Handle);
+
+ return success;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void EnableLocal(params int[] types)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+ DoOutOp((int)Op.EnableLocal, writer => WriteEventTypes(types, writer));
+ }
+
+ /** <inheritDoc /> */
+ public void DisableLocal(params int[] types)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(types, "types");
+
+ DoOutOp((int)Op.DisableLocal, writer => WriteEventTypes(types, writer));
+ }
+
+ /** <inheritDoc /> */
+ public int[] GetEnabledEvents()
+ {
+ return DoInOp((int)Op.GetEnabledEvents, reader => ReadEventTypes(reader));
+ }
+
+ /** <inheritDoc /> */
+ public bool IsEnabled(int type)
+ {
+ return UU.EventsIsEnabled(Target, type);
+ }
+
+ /// <summary>
+ /// Waits for the specified events.
+ /// </summary>
+ /// <typeparam name="T">Type of events.</typeparam>
+ /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param>
+ /// <param name="handle">The filter handle, if applicable.</param>
+ /// <param name="types">Types of the events to wait for.
+ /// If not provided, all events will be passed to the filter.</param>
+ /// <returns>Ignite event.</returns>
+ protected T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent
+ {
+ if (filter != null)
+ handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter
+ {
+ InvokeFunc = stream => InvokeLocalFilter(stream, filter)
+ });
+
+ var hnd = handle;
+
+ return DoOutInOp((int)Op.WaitForLocal,
+ writer =>
+ {
+ if (filter != null)
+ {
+ writer.WriteBoolean(true);
+ writer.WriteLong(hnd);
+ }
+ else
+ writer.WriteBoolean(false);
+
+ WriteEventTypes(types, writer);
+ },
+ reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader)));
+ }
+
+ /// <summary>
+ /// Reads events from a portable stream.
+ /// </summary>
+ /// <typeparam name="T">Event type.</typeparam>
+ /// <param name="reader">Reader.</param>
+ /// <returns>Resulting list or null.</returns>
+ private List<T> ReadEvents<T>(IPortableStream reader) where T : IEvent
+ {
+ return ReadEvents<T>(Marshaller.StartUnmarshal(reader));
+ }
+
+ /// <summary>
+ /// Reads events from a portable reader.
+ /// </summary>
+ /// <typeparam name="T">Event type.</typeparam>
+ /// <param name="portableReader">Reader.</param>
+ /// <returns>Resulting list or null.</returns>
+ protected static List<T> ReadEvents<T>(PortableReaderImpl portableReader) where T : IEvent
+ {
+ var count = portableReader.RawReader().ReadInt();
+
+ if (count == -1)
+ return null;
+
+ var result = new List<T>(count);
+
+ for (var i = 0; i < count; i++)
+ result.Add(EventReader.Read<T>(portableReader));
+
+ return result;
+ }
+
+ /// <summary>
+ /// Gets local filters by user listener and event type.
+ /// </summary>
+ /// <param name="listener">Listener.</param>
+ /// <param name="types">Types.</param>
+ /// <returns>Collection of local listener wrappers.</returns>
+ [SuppressMessage("ReSharper", "InconsistentlySynchronizedField",
+ Justification = "This private method should be always called within a lock on localFilters")]
+ private IEnumerable<LocalHandledEventFilter> GetLocalFilters(object listener, int[] types)
+ {
+ Dictionary<int, LocalHandledEventFilter> filters;
+
+ if (!_localFilters.TryGetValue(listener, out filters))
+ return Enumerable.Empty<LocalHandledEventFilter>();
+
+ if (types.Length == 0)
+ return filters.Values;
+
+ return types.Select(type =>
+ {
+ LocalHandledEventFilter filter;
+
+ return filters.TryGetValue(type, out filter) ? filter : null;
+ }).Where(x => x != null);
+ }
+
+ /// <summary>
+ /// Adds an event listener for local events.
+ /// </summary>
+ /// <typeparam name="T">Type of events.</typeparam>
+ /// <param name="listener">Predicate that is called on each received event.</param>
+ /// <param name="type">Event type for which this listener will be notified</param>
+ private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent
+ {
+ lock (_localFilters)
+ {
+ Dictionary<int, LocalHandledEventFilter> filters;
+
+ if (!_localFilters.TryGetValue(listener, out filters))
+ {
+ filters = new Dictionary<int, LocalHandledEventFilter>();
+
+ _localFilters[listener] = filters;
+ }
+
+ LocalHandledEventFilter localFilter;
+
+ if (!filters.TryGetValue(type, out localFilter))
+ {
+ localFilter = CreateLocalFilter(listener, type);
+
+ filters[type] = localFilter;
+ }
+
+ UU.EventsLocalListen(Target, localFilter.Handle, type);
+ }
+ }
+
+ /// <summary>
+ /// Creates a user filter wrapper.
+ /// </summary>
+ /// <typeparam name="T">Event object type.</typeparam>
+ /// <param name="listener">Listener.</param>
+ /// <param name="type">Event type.</param>
+ /// <returns>Created wrapper.</returns>
+ private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent
+ {
+ var result = new LocalHandledEventFilter(
+ stream => InvokeLocalFilter(stream, listener),
+ unused =>
+ {
+ lock (_localFilters)
+ {
+ Dictionary<int, LocalHandledEventFilter> filters;
+
+ if (_localFilters.TryGetValue(listener, out filters))
+ {
+ filters.Remove(type);
+
+ if (filters.Count == 0)
+ _localFilters.Remove(listener);
+ }
+ }
+ });
+
+ result.Handle = Ignite.HandleRegistry.Allocate(result);
+
+ return result;
+ }
+
+ /// <summary>
+ /// Invokes local filter using data from specified stream.
+ /// </summary>
+ /// <typeparam name="T">Event object type.</typeparam>
+ /// <param name="stream">The stream.</param>
+ /// <param name="listener">The listener.</param>
+ /// <returns>Filter invocation result.</returns>
+ private bool InvokeLocalFilter<T>(IPortableStream stream, IEventFilter<T> listener) where T : IEvent
+ {
+ var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream));
+
+ // No guid in local mode
+ return listener.Invoke(Guid.Empty, evt);
+ }
+
+ /// <summary>
+ /// Writes the event types.
+ /// </summary>
+ /// <param name="types">Types.</param>
+ /// <param name="writer">Writer.</param>
+ private static void WriteEventTypes(int[] types, IPortableRawWriter writer)
+ {
+ if (types.Length == 0)
+ types = null; // empty array means no type filtering
+
+ writer.WriteIntArray(types);
+ }
+
+ /// <summary>
+ /// Writes the event types.
+ /// </summary>
+ /// <param name="reader">Reader.</param>
+ private int[] ReadEventTypes(IPortableStream reader)
+ {
+ return Marshaller.StartUnmarshal(reader).ReadIntArray();
+ }
+
+ /// <summary>
+ /// Local user filter wrapper.
+ /// </summary>
+ private class LocalEventFilter : IInteropCallback
+ {
+ /** */
+ public Func<IPortableStream, bool> InvokeFunc;
+
+ /** <inheritdoc /> */
+ public int Invoke(IPortableStream stream)
+ {
+ return InvokeFunc(stream) ? 1 : 0;
+ }
+ }
+
+ /// <summary>
+ /// Local user filter wrapper with handle.
+ /// </summary>
+ private class LocalHandledEventFilter : Handle<Func<IPortableStream, bool>>, IInteropCallback
+ {
+ /** */
+ public long Handle;
+
+ /** <inheritdoc /> */
+ public int Invoke(IPortableStream stream)
+ {
+ return Target(stream) ? 1 : 0;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="LocalHandledEventFilter"/> class.
+ /// </summary>
+ /// <param name="invokeFunc">The invoke function.</param>
+ /// <param name="releaseAction">The release action.</param>
+ public LocalHandledEventFilter(
+ Func<IPortableStream, bool> invokeFunc, Action<Func<IPortableStream, bool>> releaseAction)
+ : base(invokeFunc, releaseAction)
+ {
+ // No-op.
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
new file mode 100644
index 0000000..632d8b8
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Async Ignite events.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+ internal class EventsAsync : Events
+ {
+ /** */
+ private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => OpNone);
+
+ /** */
+ private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Events"/> class.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="clusterGroup">Cluster group.</param>
+ public EventsAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup)
+ : base(target, marsh, clusterGroup)
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public override List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types)
+ {
+ _lastAsyncOp.Value = (int) Op.RemoteQuery;
+
+ var result = base.RemoteQuery(filter, timeout, types);
+
+ // Result is a List<T> so we can't create proper converter later in GetFuture call from user.
+ // ReSharper disable once RedundantTypeArgumentsOfMethod (otherwise won't compile in VS2010 / TC)
+ _curFut.Value = GetFuture<List<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp,
+ (int) Op.RemoteQuery), convertFunc: ReadEvents<T>);
+
+ return result;
+ }
+
+ /** <inheritdoc /> */
+ public override Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true,
+ IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types)
+ {
+ _lastAsyncOp.Value = (int) Op.RemoteListen;
+ _curFut.Value = null;
+
+ return base.RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, types);
+ }
+
+ /** <inheritdoc /> */
+ public override void StopRemoteListen(Guid opId)
+ {
+ _lastAsyncOp.Value = (int) Op.StopRemoteListen;
+ _curFut.Value = null;
+
+ base.StopRemoteListen(opId);
+ }
+
+ /** <inheritdoc /> */
+ public override T WaitForLocal<T>(IEventFilter<T> filter, params int[] types)
+ {
+ _lastAsyncOp.Value = (int) Op.WaitForLocal;
+
+ long hnd = 0;
+
+ try
+ {
+ var result = WaitForLocal0(filter, ref hnd, types);
+
+ if (filter != null)
+ {
+ // Dispose handle as soon as future ends.
+ var fut = GetFuture<T>();
+
+ _curFut.Value = fut;
+
+ fut.Listen(() => Ignite.HandleRegistry.Release(hnd));
+ }
+ else
+ _curFut.Value = null;
+
+ return result;
+ }
+ catch (Exception)
+ {
+ Ignite.HandleRegistry.Release(hnd);
+ throw;
+ }
+ }
+
+ /** <inheritdoc /> */
+ public override IEvents WithAsync()
+ {
+ return this;
+ }
+
+ /** <inheritdoc /> */
+ public override bool IsAsync
+ {
+ get { return true; }
+ }
+
+ /** <inheritdoc /> */
+ public override IFuture GetFuture()
+ {
+ return GetFuture<object>();
+ }
+
+ /** <inheritdoc /> */
+ public override IFuture<T> GetFuture<T>()
+ {
+ if (_curFut.Value != null)
+ {
+ var fut = _curFut.Value;
+ _curFut.Value = null;
+ return (IFuture<T>) fut;
+ }
+
+ Func<PortableReaderImpl, T> converter = null;
+
+ if (_lastAsyncOp.Value == (int) Op.WaitForLocal)
+ converter = reader => (T) EventReader.Read<IEvent>(reader);
+
+ return GetFuture((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, _lastAsyncOp.Value),
+ convertFunc: converter);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
new file mode 100644
index 0000000..8b44966
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Events
+{
+ using System;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+
+ /// <summary>
+ /// Event filter/listener holder for RemoteListen.
+ /// </summary>
+ internal class RemoteListenEventFilter : IInteropCallback
+ {
+ /** */
+ private readonly Ignite _ignite;
+
+ /** */
+ private readonly Func<Guid, IEvent, bool> _filter;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="RemoteListenEventFilter"/> class.
+ /// </summary>
+ /// <param name="ignite">The grid.</param>
+ /// <param name="filter">The filter.</param>
+ public RemoteListenEventFilter(Ignite ignite, Func<Guid, IEvent, bool> filter)
+ {
+ _ignite = ignite;
+ _filter = filter;
+ }
+
+ /** <inheritdoc /> */
+ public int Invoke(IPortableStream stream)
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ var evt = EventReader.Read<IEvent>(reader);
+
+ var nodeId = reader.ReadGuid() ?? Guid.Empty;
+
+ return _filter(nodeId, evt) ? 1 : 0;
+ }
+
+ /// <summary>
+ /// Creates an instance of this class from a stream.
+ /// </summary>
+ /// <param name="memPtr">Memory pointer.</param>
+ /// <param name="grid">Grid</param>
+ /// <returns>Deserialized instance of <see cref="RemoteListenEventFilter"/></returns>
+ public static RemoteListenEventFilter CreateInstance(long memPtr, Ignite grid)
+ {
+ Debug.Assert(grid != null);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var marsh = grid.Marshaller;
+
+ var reader = marsh.StartUnmarshal(stream);
+
+ var pred = reader.ReadObject<PortableOrSerializableObjectHolder>().Item;
+
+ var func = DelegateTypeDescriptor.GetEventFilter(pred.GetType());
+
+ return new RemoteListenEventFilter(grid, (id, evt) => func(pred, id, evt));
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
new file mode 100644
index 0000000..066f345
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Runtime.InteropServices;
+ using System.Security;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Managed environment. Acts as a gateway for native code.
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal static class ExceptionUtils
+ {
+ /** NoClassDefFoundError fully-qualified class name which is important during startup phase. */
+ private const string ClsNoClsDefFoundErr = "java.lang.NoClassDefFoundError";
+
+ /** NoSuchMethodError fully-qualified class name which is important during startup phase. */
+ private const string ClsNoSuchMthdErr = "java.lang.NoSuchMethodError";
+
+ /** InteropCachePartialUpdateException. */
+ private const string ClsCachePartialUpdateErr = "org.apache.ignite.internal.processors.platform.cache.PlatformCachePartialUpdateException";
+
+ /** Map with predefined exceptions. */
+ private static readonly IDictionary<string, ExceptionFactoryDelegate> EXS = new Dictionary<string, ExceptionFactoryDelegate>();
+
+ /** Exception factory delegate. */
+ private delegate Exception ExceptionFactoryDelegate(string msg);
+
+ /// <summary>
+ /// Static initializer.
+ /// </summary>
+ static ExceptionUtils()
+ {
+ // Common Java exceptions mapped to common .Net exceptions.
+ EXS["java.lang.IllegalArgumentException"] = m => new ArgumentException(m);
+ EXS["java.lang.IllegalStateException"] = m => new InvalidOperationException(m);
+ EXS["java.lang.UnsupportedOperationException"] = m => new NotImplementedException(m);
+ EXS["java.lang.InterruptedException"] = m => new ThreadInterruptedException(m);
+
+ // Generic Ignite exceptions.
+ EXS["org.apache.ignite.IgniteException"] = m => new IgniteException(m);
+ EXS["org.apache.ignite.IgniteCheckedException"] = m => new IgniteException(m);
+
+ // Cluster exceptions.
+ EXS["org.apache.ignite.cluster.ClusterGroupEmptyException"] = m => new ClusterGroupEmptyException(m);
+ EXS["org.apache.ignite.cluster.ClusterTopologyException"] = m => new ClusterTopologyException(m);
+
+ // Compute exceptions.
+ EXS["org.apache.ignite.compute.ComputeExecutionRejectedException"] = m => new ComputeExecutionRejectedException(m);
+ EXS["org.apache.ignite.compute.ComputeJobFailoverException"] = m => new ComputeJobFailoverException(m);
+ EXS["org.apache.ignite.compute.ComputeTaskCancelledException"] = m => new ComputeTaskCancelledException(m);
+ EXS["org.apache.ignite.compute.ComputeTaskTimeoutException"] = m => new ComputeTaskTimeoutException(m);
+ EXS["org.apache.ignite.compute.ComputeUserUndeclaredException"] = m => new ComputeUserUndeclaredException(m);
+
+ // Cache exceptions.
+ EXS["javax.cache.CacheException"] = m => new CacheException(m);
+ EXS["javax.cache.integration.CacheLoaderException"] = m => new CacheStoreException(m);
+ EXS["javax.cache.integration.CacheWriterException"] = m => new CacheStoreException(m);
+ EXS["javax.cache.processor.EntryProcessorException"] = m => new CacheEntryProcessorException(m);
+ EXS["org.apache.ignite.cache.CacheAtomicUpdateTimeoutException"] = m => new CacheAtomicUpdateTimeoutException(m);
+
+ // Transaction exceptions.
+ EXS["org.apache.ignite.transactions.TransactionOptimisticException"] = m => new TransactionOptimisticException(m);
+ EXS["org.apache.ignite.transactions.TransactionTimeoutException"] = m => new TransactionTimeoutException(m);
+ EXS["org.apache.ignite.transactions.TransactionRollbackException"] = m => new TransactionRollbackException(m);
+ EXS["org.apache.ignite.transactions.TransactionHeuristicException"] = m => new TransactionHeuristicException(m);
+
+ // Security exceptions.
+ EXS["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m);
+ EXS["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m);
+ }
+
+ /// <summary>
+ /// Creates exception according to native code class and message.
+ /// </summary>
+ /// <param name="clsName">Exception class name.</param>
+ /// <param name="msg">Exception message.</param>
+ /// <param name="reader">Error data reader.</param>
+ public static Exception GetException(string clsName, string msg, PortableReaderImpl reader = null)
+ {
+ ExceptionFactoryDelegate ctor;
+
+ if (EXS.TryGetValue(clsName, out ctor))
+ return ctor(msg);
+
+ if (ClsNoClsDefFoundErr.Equals(clsName))
+ return new IgniteException("Java class is not found (did you set IGNITE_HOME environment " +
+ "variable?): " + msg);
+
+ if (ClsNoSuchMthdErr.Equals(clsName))
+ return new IgniteException("Java class method is not found (did you set IGNITE_HOME environment " +
+ "variable?): " + msg);
+
+ if (ClsCachePartialUpdateErr.Equals(clsName))
+ return ProcessCachePartialUpdateException(msg, reader);
+
+ return new IgniteException("Java exception occurred [class=" + clsName + ", message=" + msg + ']');
+ }
+
+ /// <summary>
+ /// Process cache partial update exception.
+ /// </summary>
+ /// <param name="msg">Message.</param>
+ /// <param name="reader">Reader.</param>
+ /// <returns></returns>
+ private static Exception ProcessCachePartialUpdateException(string msg, PortableReaderImpl reader)
+ {
+ if (reader == null)
+ return new CachePartialUpdateException(msg, new IgniteException("Failed keys are not available."));
+
+ bool dataExists = reader.ReadBoolean();
+
+ Debug.Assert(dataExists);
+
+ if (reader.ReadBoolean())
+ {
+ bool keepPortable = reader.ReadBoolean();
+
+ PortableReaderImpl keysReader = reader.Marshaller.StartUnmarshal(reader.Stream, keepPortable);
+
+ try
+ {
+ return new CachePartialUpdateException(msg, ReadNullableList(keysReader));
+ }
+ catch (Exception e)
+ {
+ // Failed to deserialize data.
+ return new CachePartialUpdateException(msg, e);
+ }
+ }
+
+ // Was not able to write keys.
+ string innerErrCls = reader.ReadString();
+ string innerErrMsg = reader.ReadString();
+
+ Exception innerErr = GetException(innerErrCls, innerErrMsg);
+
+ return new CachePartialUpdateException(msg, innerErr);
+ }
+
+ /// <summary>
+ /// Create JVM initialization exception.
+ /// </summary>
+ /// <param name="clsName">Class name.</param>
+ /// <param name="msg">Message.</param>
+ /// <returns>Exception.</returns>
+ public static Exception GetJvmInitializeException(string clsName, string msg)
+ {
+ if (clsName != null)
+ return new IgniteException("Failed to initialize JVM.", GetException(clsName, msg));
+
+ if (msg != null)
+ return new IgniteException("Failed to initialize JVM: " + msg);
+
+ return new IgniteException("Failed to initialize JVM.");
+ }
+
+ /// <summary>
+ /// Reads nullable list.
+ /// </summary>
+ /// <param name="reader">Reader.</param>
+ /// <returns>List.</returns>
+ private static List<object> ReadNullableList(PortableReaderImpl reader)
+ {
+ if (!reader.ReadBoolean())
+ return null;
+
+ var size = reader.ReadInt();
+
+ var list = new List<object>(size);
+
+ for (int i = 0; i < size; i++)
+ list.Add(reader.ReadObject<object>());
+
+ return list;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/Handle.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/Handle.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/Handle.cs
new file mode 100644
index 0000000..0168963
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/Handle.cs
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Handle
+{
+ using System;
+ using System.Threading;
+
+ /// <summary>
+ /// Wrapper over some resource ensuring it's release.
+ /// </summary>
+ public class Handle<T> : IHandle
+ {
+ /** Target.*/
+ private readonly T _target;
+
+ /** Release action. */
+ private readonly Action<T> _releaseAction;
+
+ /** Release flag. */
+ private int _released;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="releaseAction">Release action.</param>
+ public Handle(T target, Action<T> releaseAction)
+ {
+ _target = target;
+ _releaseAction = releaseAction;
+ }
+
+ /// <summary>
+ /// Target.
+ /// </summary>
+ public T Target
+ {
+ get { return _target; }
+ }
+
+ /** <inheritdoc /> */
+ public void Release()
+ {
+ if (Interlocked.CompareExchange(ref _released, 1, 0) == 0)
+ _releaseAction(_target);
+ }
+
+ /** <inheritdoc /> */
+ public bool Released
+ {
+ get { return Thread.VolatileRead(ref _released) == 1; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
new file mode 100644
index 0000000..9c8178f
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Handle
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+
+ /// <summary>
+ /// Resource registry.
+ /// </summary>
+ public class HandleRegistry
+ {
+ /** Default critical resources capacity. */
+ internal const int DfltFastCap = 1024;
+
+ /** Array for fast-path. */
+ private readonly object[] _fast;
+
+ /** Dictionery for slow-path. */
+ private readonly ConcurrentDictionary<long, object> _slow;
+
+ /** Capacity of fast array. */
+ private readonly int _fastCap;
+
+ /** Counter for fast-path. */
+ private int _fastCtr;
+
+ /** Counter for slow-path. */
+ private long _slowCtr;
+
+ /** Close flag. */
+ private int _closed;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ public HandleRegistry() : this(DfltFastCap)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="fastCap">Amount of critical resources this registry can allocate in "fast" mode.</param>
+ public HandleRegistry(int fastCap)
+ {
+ _fastCap = fastCap;
+ _fast = new object[fastCap];
+
+ _slow = new ConcurrentDictionary<long, object>();
+ _slowCtr = fastCap;
+ }
+
+ /// <summary>
+ /// Allocate a handle for resource.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <returns>Pointer.</returns>
+ public long Allocate(object target)
+ {
+ return Allocate0(target, false, false);
+ }
+
+ /// <summary>
+ /// Allocate a handle in safe mode.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <returns>Pointer.</returns>
+ public long AllocateSafe(object target)
+ {
+ return Allocate0(target, false, true);
+ }
+
+ /// <summary>
+ /// Allocate a handle for critical resource.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <returns>Pointer.</returns>
+ public long AllocateCritical(object target)
+ {
+ return Allocate0(target, true, false);
+ }
+
+ /// <summary>
+ /// Allocate a handle for critical resource in safe mode.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <returns>Pointer.</returns>
+ public long AllocateCriticalSafe(object target)
+ {
+ return Allocate0(target, true, true);
+ }
+
+ /// <summary>
+ /// Internal allocation routine.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="critical">Critical flag.</param>
+ /// <param name="safe">Safe flag.</param>
+ /// <returns>Pointer.</returns>
+ private long Allocate0(object target, bool critical, bool safe)
+ {
+ if (Closed)
+ throw ClosedException();
+
+ // Try allocating on critical path.
+ if (critical)
+ {
+ if (_fastCtr < _fastCap) // Non-volatile read could yield in old value, but increment resolves this.
+ {
+ int fastIdx = Interlocked.Increment(ref _fastCtr);
+
+ if (fastIdx < _fastCap)
+ {
+ Thread.VolatileWrite(ref _fast[fastIdx], target);
+
+ if (safe && Closed)
+ {
+ Thread.VolatileWrite(ref _fast[fastIdx], null);
+
+ Release0(target, true);
+
+ throw ClosedException();
+ }
+
+ return fastIdx;
+ }
+ }
+ }
+
+ // Critical allocation failed, fallback to slow mode.
+ long slowIdx = Interlocked.Increment(ref _slowCtr);
+
+ _slow[slowIdx] = target;
+
+ if (safe && Closed)
+ {
+ _slow[slowIdx] = null;
+
+ Release0(target, true);
+
+ throw ClosedException();
+ }
+
+ return slowIdx;
+ }
+
+
+ /// <summary>
+ /// Release handle.
+ /// </summary>
+ /// <param name="id">Identifier.</param>
+ /// <param name="quiet">Whether release must be quiet or not.</param>
+ public void Release(long id, bool quiet = false)
+ {
+ if (id < _fastCap)
+ {
+ object target = Thread.VolatileRead(ref _fast[id]);
+
+ if (target != null)
+ {
+ Thread.VolatileWrite(ref _fast[id], null);
+
+ Release0(target, quiet);
+ }
+ }
+ else
+ {
+ object target;
+
+ if (_slow.TryRemove(id, out target))
+ Release0(target, quiet);
+ }
+ }
+
+ /// <summary>
+ /// Internal release routine.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="quiet">Whether release must be quiet or not.</param>
+ private static void Release0(object target, bool quiet)
+ {
+ IHandle target0 = target as IHandle;
+
+ if (target0 != null)
+ {
+ if (quiet)
+ {
+ try
+ {
+ target0.Release();
+ }
+ catch (Exception)
+ {
+ // No-op.
+ }
+ }
+ else
+ target0.Release();
+ }
+ }
+
+ /// <summary>
+ /// Gets handle target.
+ /// </summary>
+ /// <param name="id">Identifier.</param>
+ /// <returns>Target.</returns>
+ public T Get<T>(long id)
+ {
+ return Get<T>(id, false);
+ }
+
+ /// <summary>
+ /// Gets handle target.
+ /// </summary>
+ /// <param name="id">Identifier.</param>
+ /// <param name="throwOnAbsent">Whether to throw an exception if resource is not found.</param>
+ /// <returns>Target.</returns>
+ public T Get<T>(long id, bool throwOnAbsent)
+ {
+ object target;
+
+ if (id < _fastCap)
+ {
+ target = Thread.VolatileRead(ref _fast[id]);
+
+ if (target != null)
+ return (T)target;
+ }
+ else
+ {
+ if (_slow.TryGetValue(id, out target))
+ return (T) target;
+ }
+
+ if (throwOnAbsent)
+ throw new InvalidOperationException("Resource handle has been released (is Ignite stopping?).");
+
+ return default(T);
+ }
+
+ /// <summary>
+ /// Close the registry. All resources allocated at the moment of close are
+ /// guaranteed to be released.
+ /// </summary>
+ public void Close()
+ {
+ if (Interlocked.CompareExchange(ref _closed, 1, 0) == 0)
+ {
+ // Cleanup on fast-path.
+ for (int i = 0; i < _fastCap; i++)
+ {
+ object target = Thread.VolatileRead(ref _fast[i]);
+
+ if (target != null)
+ {
+ Thread.VolatileWrite(ref _fast[i], null);
+
+ Release0(target, true);
+ }
+ }
+
+ // Cleanup on slow-path.
+ foreach (var item in _slow)
+ {
+ object target = item.Value;
+
+ if (target != null)
+ Release0(target, true);
+ }
+
+ _slow.Clear();
+ }
+ }
+
+ /// <summary>
+ /// Closed flag.
+ /// </summary>
+ public bool Closed
+ {
+ get { return Thread.VolatileRead(ref _closed) == 1; }
+ }
+
+ /// <summary>
+ /// Gets the current handle count.
+ /// </summary>
+ public int Count
+ {
+ get
+ {
+ Thread.MemoryBarrier();
+
+ return _fast.Count(x => x != null) + _slow.Count;
+ }
+ }
+
+ /// <summary>
+ /// Gets a snapshot of currently referenced objects list.
+ /// </summary>
+ public List<KeyValuePair<long, object>> GetItems()
+ {
+ Thread.MemoryBarrier();
+
+ return
+ _fast.Select((x, i) => new KeyValuePair<long, object>(i, x))
+ .Where(x => x.Value != null)
+ .Concat(_slow)
+ .ToList();
+ }
+
+ /// <summary>
+ /// Create new exception for closed state.
+ /// </summary>
+ /// <returns>Exception.</returns>
+ private static Exception ClosedException()
+ {
+ return new InvalidOperationException("Cannot allocate a resource handle because Ignite is stopping.");
+ }
+ }
+}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/IHandle.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/IHandle.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/IHandle.cs
new file mode 100644
index 0000000..d147f8b
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Handle/IHandle.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Handle
+{
+ /// <summary>
+ /// Wrapper over some resource ensuring it's release.
+ /// </summary>
+ public interface IHandle
+ {
+ /// <summary>
+ /// Release the resource.
+ /// </summary>
+ void Release();
+
+ /// <summary>
+ /// Resource released flag.
+ /// </summary>
+ bool Released { get; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
new file mode 100644
index 0000000..91838d0
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using Apache.Ignite.Core.Impl.Portable.IO;
+
+ /// <summary>
+ /// Interop callback.
+ /// </summary>
+ internal interface IInteropCallback
+ {
+ /// <summary>
+ /// Invokes callback.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <returns>Invocation result.</returns>
+ int Invoke(IPortableStream stream);
+ }
+}
\ No newline at end of file