You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/04 18:27:57 UTC
[43/55] [abbrv] ignite git commit: IGNITE-1348: Moved GridGain's .Net
module to Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
new file mode 100644
index 0000000..0f4b5a3
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Cache.Query;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Abstract query cursor implementation.
+ /// </summary>
+ internal abstract class AbstractQueryCursor<T> : PlatformDisposableTarget, IQueryCursor<T>, IEnumerator<T>
+ {
+ /** */
+ private const int OpGetAll = 1;
+
+ /** */
+ private const int OpGetBatch = 2;
+
+ /** Position before head. */
+ private const int BatchPosBeforeHead = -1;
+
+ /** Keep portable flag. */
+ private readonly bool _keepPortable;
+
+ /** Wherther "GetAll" was called. */
+ private bool _getAllCalled;
+
+ /** Whether "GetEnumerator" was called. */
+ private bool _iterCalled;
+
+ /** Batch with entries. */
+ private T[] _batch;
+
+ /** Current position in batch. */
+ private int _batchPos = BatchPosBeforeHead;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ protected AbstractQueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable) :
+ base(target, marsh)
+ {
+ _keepPortable = keepPortable;
+ }
+
+ #region Public methods
+
+ /** <inheritdoc /> */
+ public IList<T> GetAll()
+ {
+ ThrowIfDisposed();
+
+ if (_iterCalled)
+ throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " +
+ "method has already been called.");
+
+ if (_getAllCalled)
+ throw new InvalidOperationException("Failed to get all entries because GetAll() " +
+ "method has already been called.");
+
+ var res = DoInOp<IList<T>>(OpGetAll, ConvertGetAll);
+
+ _getAllCalled = true;
+
+ return res;
+ }
+
+ /** <inheritdoc /> */
+ protected override void Dispose(bool disposing)
+ {
+ try
+ {
+ UU.QueryCursorClose(Target);
+ }
+ finally
+ {
+ base.Dispose(disposing);
+ }
+ }
+
+ #endregion
+
+ #region Public IEnumerable methods
+
+ /** <inheritdoc /> */
+ [SuppressMessage("ReSharper", "PossibleNullReferenceException")]
+ public IEnumerator<T> GetEnumerator()
+ {
+ ThrowIfDisposed();
+
+ if (_iterCalled)
+ throw new InvalidOperationException("Failed to get enumerator entries because " +
+ "GetEnumeartor() method has already been called.");
+
+ if (_getAllCalled)
+ throw new InvalidOperationException("Failed to get enumerator entries because " +
+ "GetAll() method has already been called.");
+
+ UU.QueryCursorIterator(Target);
+
+ _iterCalled = true;
+
+ return this;
+ }
+
+ /** <inheritdoc /> */
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return GetEnumerator();
+ }
+
+ #endregion
+
+ #region Public IEnumerator methods
+
+ /** <inheritdoc /> */
+ public T Current
+ {
+ get
+ {
+ ThrowIfDisposed();
+
+ if (_batchPos == BatchPosBeforeHead)
+ throw new InvalidOperationException("MoveNext has not been called.");
+
+ if (_batch == null)
+ throw new InvalidOperationException("Previous call to MoveNext returned false.");
+
+ return _batch[_batchPos];
+ }
+ }
+
+ /** <inheritdoc /> */
+ object IEnumerator.Current
+ {
+ get { return Current; }
+ }
+
+ /** <inheritdoc /> */
+ public bool MoveNext()
+ {
+ ThrowIfDisposed();
+
+ if (_batch == null)
+ {
+ if (_batchPos == BatchPosBeforeHead)
+ // Standing before head, let's get batch and advance position.
+ RequestBatch();
+ }
+ else
+ {
+ _batchPos++;
+
+ if (_batch.Length == _batchPos)
+ // Reached batch end => request another.
+ RequestBatch();
+ }
+
+ return _batch != null;
+ }
+
+ /** <inheritdoc /> */
+ public void Reset()
+ {
+ throw new NotSupportedException("Reset is not supported.");
+ }
+
+ #endregion
+
+ #region Non-public methods
+
+ /// <summary>
+ /// Read entry from the reader.
+ /// </summary>
+ /// <param name="reader">Reader.</param>
+ /// <returns>Entry.</returns>
+ protected abstract T Read(PortableReaderImpl reader);
+
+ /** <inheritdoc /> */
+ protected override T1 Unmarshal<T1>(IPortableStream stream)
+ {
+ return Marshaller.Unmarshal<T1>(stream, _keepPortable);
+ }
+
+ /// <summary>
+ /// Request next batch.
+ /// </summary>
+ private void RequestBatch()
+ {
+ _batch = DoInOp<T[]>(OpGetBatch, ConvertGetBatch);
+
+ _batchPos = 0;
+ }
+
+ /// <summary>
+ /// Converter for GET_ALL operation.
+ /// </summary>
+ /// <param name="stream">Portable stream.</param>
+ /// <returns>Result.</returns>
+ private IList<T> ConvertGetAll(IPortableStream stream)
+ {
+ var reader = Marshaller.StartUnmarshal(stream, _keepPortable);
+
+ var size = reader.ReadInt();
+
+ var res = new List<T>(size);
+
+ for (var i = 0; i < size; i++)
+ res.Add(Read(reader));
+
+ return res;
+ }
+
+ /// <summary>
+ /// Converter for GET_BATCH operation.
+ /// </summary>
+ /// <param name="stream">Portable stream.</param>
+ /// <returns>Result.</returns>
+ private T[] ConvertGetBatch(IPortableStream stream)
+ {
+ var reader = Marshaller.StartUnmarshal(stream, _keepPortable);
+
+ var size = reader.ReadInt();
+
+ if (size == 0)
+ return null;
+
+ var res = new T[size];
+
+ for (var i = 0; i < size; i++)
+ res[i] = Read(reader);
+
+ return res;
+ }
+
+ #endregion
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs
new file mode 100644
index 0000000..5738ed9
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
+{
+ using Apache.Ignite.Core.Cache.Event;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using CQU = ContinuousQueryUtils;
+
+ /// <summary>
+ /// Continuous query filter interface. Required to hide generic nature of underliyng real filter.
+ /// </summary>
+ internal interface IContinuousQueryFilter
+ {
+ /// <summary>
+ /// Evaluate filter.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <returns>Result.</returns>
+ bool Evaluate(IPortableStream stream);
+
+ /// <summary>
+ /// Inject grid.
+ /// </summary>
+ /// <param name="grid"></param>
+ void Inject(Ignite grid);
+
+ /// <summary>
+ /// Allocate handle for the filter.
+ /// </summary>
+ /// <returns></returns>
+ long Allocate();
+
+ /// <summary>
+ /// Release filter.
+ /// </summary>
+ void Release();
+ }
+
+ /// <summary>
+ /// Continuous query filter generic implementation.
+ /// </summary>
+ internal class ContinuousQueryFilter<TK, TV> : IContinuousQueryFilter
+ {
+ /** Actual filter. */
+ private readonly ICacheEntryEventFilter<TK, TV> _filter;
+
+ /** Keep portable flag. */
+ private readonly bool _keepPortable;
+
+ /** Ignite hosting the filter. */
+ private volatile Ignite _ignite;
+
+ /** GC handle. */
+ private long? _hnd;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="filter">Actual filter.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ public ContinuousQueryFilter(ICacheEntryEventFilter<TK, TV> filter, bool keepPortable)
+ {
+ _filter = filter;
+ _keepPortable = keepPortable;
+ }
+
+ /** <inheritDoc /> */
+ public bool Evaluate(IPortableStream stream)
+ {
+ ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _ignite.Marshaller, _keepPortable);
+
+ return _filter.Evaluate(evt);
+ }
+
+ /** <inheritDoc /> */
+ public void Inject(Ignite grid)
+ {
+ _ignite = grid;
+
+ ResourceProcessor.Inject(_filter, grid);
+ }
+
+ /** <inheritDoc /> */
+ public long Allocate()
+ {
+ lock (this)
+ {
+ if (!_hnd.HasValue)
+ _hnd = _ignite.HandleRegistry.Allocate(this);
+
+ return _hnd.Value;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public void Release()
+ {
+ lock (this)
+ {
+ if (_hnd.HasValue)
+ {
+ _ignite.HandleRegistry.Release(_hnd.Value);
+
+ _hnd = null;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs
new file mode 100644
index 0000000..65da674
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
+{
+ using System;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Continuous query remote filter holder. Wraps real filter into portable object,
+ /// so that it can be passed over wire to another node.
+ /// </summary>
+ public class ContinuousQueryFilterHolder : IPortableWriteAware
+ {
+ /** Key type. */
+ private readonly Type _keyTyp;
+
+ /** Value type. */
+ private readonly Type _valTyp;
+
+ /** Filter object. */
+ private readonly object _filter;
+
+ /** Keep portable flag. */
+ private readonly bool _keepPortable;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="keyTyp">Key type.</param>
+ /// <param name="valTyp">Value type.</param>
+ /// <param name="filter">Filter.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ public ContinuousQueryFilterHolder(Type keyTyp, Type valTyp, object filter, bool keepPortable)
+ {
+ _keyTyp = keyTyp;
+ _valTyp = valTyp;
+ _filter = filter;
+ _keepPortable = keepPortable;
+ }
+
+ /// <summary>
+ /// Key type.
+ /// </summary>
+ internal Type KeyType
+ {
+ get { return _keyTyp; }
+ }
+
+ /// <summary>
+ /// Value type.
+ /// </summary>
+ internal Type ValueType
+ {
+ get { return _valTyp; }
+ }
+
+ /// <summary>
+ /// Filter.
+ /// </summary>
+ internal object Filter
+ {
+ get { return _filter; }
+ }
+
+ /// <summary>
+ /// Keep portable flag.
+ /// </summary>
+ internal bool KeepPortable
+ {
+ get { return _keepPortable; }
+ }
+
+ /// <summary>
+ /// Writes this object to the given writer.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ public void WritePortable(IPortableWriter writer)
+ {
+ PortableWriterImpl rawWriter = (PortableWriterImpl) writer.RawWriter();
+
+ PortableUtils.WritePortableOrSerializable(rawWriter, _keyTyp);
+ PortableUtils.WritePortableOrSerializable(rawWriter, _valTyp);
+ PortableUtils.WritePortableOrSerializable(rawWriter, _filter);
+
+ rawWriter.WriteBoolean(_keepPortable);
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ContinuousQueryFilterHolder"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ContinuousQueryFilterHolder(IPortableReader reader)
+ {
+ PortableReaderImpl rawReader = (PortableReaderImpl) reader.RawReader();
+
+ _keyTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader);
+ _valTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader);
+ _filter = PortableUtils.ReadPortableOrSerializable<object>(rawReader);
+ _keepPortable = rawReader.ReadBoolean();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
new file mode 100644
index 0000000..7a1b544
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
+{
+ using System;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Event;
+ using Apache.Ignite.Core.Cache.Query;
+ using Apache.Ignite.Core.Cache.Query.Continuous;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+ using CQU = ContinuousQueryUtils;
+
+ /// <summary>
+ /// Continuous query handle interface.
+ /// </summary>
+ internal interface IContinuousQueryHandleImpl : IDisposable
+ {
+ /// <summary>
+ /// Process callback.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <returns>Result.</returns>
+ void Apply(IPortableStream stream);
+ }
+
+ /// <summary>
+ /// Continuous query handle.
+ /// </summary>
+ internal class ContinuousQueryHandleImpl<TK, TV> : IContinuousQueryHandleImpl, IContinuousQueryFilter,
+ IContinuousQueryHandle<ICacheEntry<TK, TV>>
+ {
+ /** Marshaller. */
+ private readonly PortableMarshaller _marsh;
+
+ /** Keep portable flag. */
+ private readonly bool _keepPortable;
+
+ /** Real listener. */
+ private readonly ICacheEntryEventListener<TK, TV> _lsnr;
+
+ /** Real filter. */
+ private readonly ICacheEntryEventFilter<TK, TV> _filter;
+
+ /** GC handle. */
+ private long _hnd;
+
+ /** Native query. */
+ private volatile IUnmanagedTarget _nativeQry;
+
+ /** Initial query cursor. */
+ private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor;
+
+ /** Disposed flag. */
+ private bool _disposed;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="qry">Query.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, PortableMarshaller marsh, bool keepPortable)
+ {
+ _marsh = marsh;
+ _keepPortable = keepPortable;
+
+ _lsnr = qry.Listener;
+ _filter = qry.Filter;
+ }
+
+ /// <summary>
+ /// Start execution.
+ /// </summary>
+ /// <param name="grid">Ignite instance.</param>
+ /// <param name="writer">Writer.</param>
+ /// <param name="cb">Callback invoked when all necessary data is written to stream.</param>
+ /// <param name="qry">Query.</param>
+ public void Start(Ignite grid, PortableWriterImpl writer, Func<IUnmanagedTarget> cb,
+ ContinuousQuery<TK, TV> qry)
+ {
+ // 1. Inject resources.
+ ResourceProcessor.Inject(_lsnr, grid);
+ ResourceProcessor.Inject(_filter, grid);
+
+ // 2. Allocate handle.
+ _hnd = grid.HandleRegistry.Allocate(this);
+
+ // 3. Write data to stream.
+ writer.WriteLong(_hnd);
+ writer.WriteBoolean(qry.Local);
+ writer.WriteBoolean(_filter != null);
+
+ ContinuousQueryFilterHolder filterHolder = _filter == null || qry.Local ? null :
+ new ContinuousQueryFilterHolder(typeof (TK), typeof (TV), _filter, _keepPortable);
+
+ writer.WriteObject(filterHolder);
+
+ writer.WriteInt(qry.BufferSize);
+ writer.WriteLong((long)qry.TimeInterval.TotalMilliseconds);
+ writer.WriteBoolean(qry.AutoUnsubscribe);
+
+ // 4. Call Java.
+ _nativeQry = cb();
+
+ // 5. Initial query.
+ var nativeInitialQryCur = UU.ContinuousQueryGetInitialQueryCursor(_nativeQry);
+ _initialQueryCursor = nativeInitialQryCur == null
+ ? null
+ : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepPortable);
+ }
+
+ /** <inheritdoc /> */
+ public void Apply(IPortableStream stream)
+ {
+ ICacheEntryEvent<TK, TV>[] evts = CQU.ReadEvents<TK, TV>(stream, _marsh, _keepPortable);
+
+ _lsnr.OnEvent(evts);
+ }
+
+ /** <inheritdoc /> */
+ public bool Evaluate(IPortableStream stream)
+ {
+ Debug.Assert(_filter != null, "Evaluate should not be called if filter is not set.");
+
+ ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _marsh, _keepPortable);
+
+ return _filter.Evaluate(evt);
+ }
+
+ /** <inheritdoc /> */
+ public void Inject(Ignite grid)
+ {
+ throw new NotSupportedException("Should not be called.");
+ }
+
+ /** <inheritdoc /> */
+ public long Allocate()
+ {
+ throw new NotSupportedException("Should not be called.");
+ }
+
+ /** <inheritdoc /> */
+ public void Release()
+ {
+ _marsh.Ignite.HandleRegistry.Release(_hnd);
+ }
+
+ /** <inheritdoc /> */
+ public IQueryCursor<ICacheEntry<TK, TV>> InitialQueryCursor
+ {
+ get { return GetInitialQueryCursor(); }
+ }
+
+ /** <inheritdoc /> */
+ public IQueryCursor<ICacheEntry<TK, TV>> GetInitialQueryCursor()
+ {
+ lock (this)
+ {
+ if (_disposed)
+ throw new ObjectDisposedException("Continuous query handle has been disposed.");
+
+ var cur = _initialQueryCursor;
+
+ if (cur == null)
+ throw new InvalidOperationException("GetInitialQueryCursor() can be called only once.");
+
+ _initialQueryCursor = null;
+
+ return cur;
+ }
+ }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ lock (this)
+ {
+ if (_disposed)
+ return;
+
+ Debug.Assert(_nativeQry != null);
+
+ try
+ {
+ UU.ContinuousQueryClose(_nativeQry);
+ }
+ finally
+ {
+ _nativeQry.Dispose();
+
+ _disposed = true;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs
new file mode 100644
index 0000000..86c8300
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
+{
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using Apache.Ignite.Core.Cache.Event;
+ using Apache.Ignite.Core.Impl.Cache.Event;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+
+ /// <summary>
+ /// Utility methods for continuous queries.
+ /// </summary>
+ static class ContinuousQueryUtils
+ {
+ /// <summary>
+ /// Read single event.
+ /// </summary>
+ /// <param name="stream">Stream to read data from.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ /// <returns>Event.</returns>
+ public static ICacheEntryEvent<TK, TV> ReadEvent<TK, TV>(IPortableStream stream,
+ PortableMarshaller marsh, bool keepPortable)
+ {
+ var reader = marsh.StartUnmarshal(stream, keepPortable);
+
+ return ReadEvent0<TK, TV>(reader);
+ }
+
+ /// <summary>
+ /// Read multiple events.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="keepPortable">Keep portable flag.</param>
+ /// <returns>Events.</returns>
+ [SuppressMessage("ReSharper", "PossibleNullReferenceException")]
+ public static ICacheEntryEvent<TK, TV>[] ReadEvents<TK, TV>(IPortableStream stream,
+ PortableMarshaller marsh, bool keepPortable)
+ {
+ var reader = marsh.StartUnmarshal(stream, keepPortable);
+
+ int cnt = reader.ReadInt();
+
+ ICacheEntryEvent<TK, TV>[] evts = new ICacheEntryEvent<TK, TV>[cnt];
+
+ for (int i = 0; i < cnt; i++)
+ evts[i] = ReadEvent0<TK, TV>(reader);
+
+ return evts;
+ }
+
+ /// <summary>
+ /// Read event.
+ /// </summary>
+ /// <param name="reader">Reader.</param>
+ /// <returns>Event.</returns>
+ private static ICacheEntryEvent<TK, TV> ReadEvent0<TK, TV>(PortableReaderImpl reader)
+ {
+ reader.DetachNext();
+ TK key = reader.ReadObject<TK>();
+
+ reader.DetachNext();
+ TV oldVal = reader.ReadObject<TV>();
+
+ reader.DetachNext();
+ TV val = reader.ReadObject<TV>();
+
+ return CreateEvent(key, oldVal, val);
+ }
+
+ /// <summary>
+ /// Create event.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="oldVal">Old value.</param>
+ /// <param name="val">Value.</param>
+ /// <returns>Event.</returns>
+ public static ICacheEntryEvent<TK, TV> CreateEvent<TK, TV>(TK key, TV oldVal, TV val)
+ {
+ if (oldVal == null)
+ {
+ Debug.Assert(val != null);
+
+ return new CacheEntryCreateEvent<TK, TV>(key, val);
+ }
+
+ if (val == null)
+ {
+ Debug.Assert(oldVal != null);
+
+ return new CacheEntryRemoveEvent<TK, TV>(key, oldVal);
+ }
+
+ return new CacheEntryUpdateEvent<TK, TV>(key, oldVal, val);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
new file mode 100644
index 0000000..f38346c
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query
+{
+ using System.Collections;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+
+ /// <summary>
+ /// Cursor for entry-based queries.
+ /// </summary>
+ internal class FieldsQueryCursor : AbstractQueryCursor<IList>
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaler.</param>
+ /// <param name="keepPortable">Keep poratble flag.</param>
+ public FieldsQueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable)
+ : base(target, marsh, keepPortable)
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ protected override IList Read(PortableReaderImpl reader)
+ {
+ int cnt = reader.ReadInt();
+
+ var res = new ArrayList(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res.Add(reader.ReadObject<object>());
+
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
new file mode 100644
index 0000000..0b113f5
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query
+{
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+
+ /// <summary>
+ /// Cursor for entry-based queries.
+ /// </summary>
+ internal class QueryCursor<TK, TV> : AbstractQueryCursor<ICacheEntry<TK, TV>>
+ {
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaler.</param>
+ /// <param name="keepPortable">Keep poratble flag.</param>
+ public QueryCursor(IUnmanagedTarget target, PortableMarshaller marsh,
+ bool keepPortable) : base(target, marsh, keepPortable)
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ protected override ICacheEntry<TK, TV> Read(PortableReaderImpl reader)
+ {
+ TK key = reader.ReadObject<TK>();
+ TV val = reader.ReadObject<TV>();
+
+ return new CacheEntry<TK, TV>(key, val);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
new file mode 100644
index 0000000..3fbc705
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Store
+{
+ using System.Collections;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Cache.Store;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Handle;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Resource;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Interop cache store.
+ /// </summary>
+ internal class CacheStore
+ {
+ /** */
+ private const byte OpLoadCache = 0;
+
+ /** */
+ private const byte OpLoad = 1;
+
+ /** */
+ private const byte OpLoadAll = 2;
+
+ /** */
+ private const byte OpPut = 3;
+
+ /** */
+ private const byte OpPutAll = 4;
+
+ /** */
+ private const byte OpRmv = 5;
+
+ /** */
+ private const byte OpRmvAll = 6;
+
+ /** */
+ private const byte OpSesEnd = 7;
+
+ /** */
+ private readonly bool _convertPortable;
+
+ /** Store. */
+ private readonly ICacheStore _store;
+
+ /** Session. */
+ private readonly CacheStoreSessionProxy _sesProxy;
+
+ /** */
+ private readonly long _handle;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheStore" /> class.
+ /// </summary>
+ /// <param name="store">Store.</param>
+ /// <param name="convertPortable">Whether to convert portable objects.</param>
+ /// <param name="registry">The handle registry.</param>
+ private CacheStore(ICacheStore store, bool convertPortable, HandleRegistry registry)
+ {
+ Debug.Assert(store != null);
+
+ _store = store;
+ _convertPortable = convertPortable;
+
+ _sesProxy = new CacheStoreSessionProxy();
+
+ ResourceProcessor.InjectStoreSession(store, _sesProxy);
+
+ _handle = registry.AllocateCritical(this);
+ }
+
+ /// <summary>
+ /// Creates interop cache store from a stream.
+ /// </summary>
+ /// <param name="memPtr">Memory pointer.</param>
+ /// <param name="registry">The handle registry.</param>
+ /// <returns>
+ /// Interop cache store.
+ /// </returns>
+ internal static CacheStore CreateInstance(long memPtr, HandleRegistry registry)
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).Stream())
+ {
+ var reader = PortableUtils.Marshaller.StartUnmarshal(stream, PortableMode.KeepPortable);
+
+ var assemblyName = reader.ReadString();
+ var className = reader.ReadString();
+ var convertPortable = reader.ReadBoolean();
+ var propertyMap = reader.ReadGenericDictionary<string, object>();
+
+ var store = (ICacheStore) IgniteUtils.CreateInstance(assemblyName, className);
+
+ IgniteUtils.SetProperties(store, propertyMap);
+
+ return new CacheStore(store, convertPortable, registry);
+ }
+ }
+
+ /// <summary>
+ /// Gets the handle.
+ /// </summary>
+ public long Handle
+ {
+ get { return _handle; }
+ }
+
+ /// <summary>
+ /// Initializes this instance with a grid.
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ public void Init(Ignite grid)
+ {
+ ResourceProcessor.Inject(_store, grid);
+ }
+
+ /// <summary>
+ /// Invokes a store operation.
+ /// </summary>
+ /// <param name="input">Input stream.</param>
+ /// <param name="cb">Callback.</param>
+ /// <param name="grid">Grid.</param>
+ /// <returns>Invocation result.</returns>
+ /// <exception cref="IgniteException">Invalid operation type: + opType</exception>
+ public int Invoke(IPortableStream input, IUnmanagedTarget cb, Ignite grid)
+ {
+ IPortableReader reader = grid.Marshaller.StartUnmarshal(input,
+ _convertPortable ? PortableMode.Deserialize : PortableMode.ForcePortable);
+
+ IPortableRawReader rawReader = reader.RawReader();
+
+ int opType = rawReader.ReadByte();
+
+ // Setup cache sessoin for this invocation.
+ long sesId = rawReader.ReadLong();
+
+ CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true);
+
+ ses.CacheName = rawReader.ReadString();
+
+ _sesProxy.SetSession(ses);
+
+ try
+ {
+ // Perform operation.
+ switch (opType)
+ {
+ case OpLoadCache:
+ _store.LoadCache((k, v) => WriteObjects(cb, grid, k, v), rawReader.ReadObjectArray<object>());
+
+ break;
+
+ case OpLoad:
+ object val = _store.Load(rawReader.ReadObject<object>());
+
+ if (val != null)
+ WriteObjects(cb, grid, val);
+
+ break;
+
+ case OpLoadAll:
+ var keys = rawReader.ReadCollection();
+
+ var result = _store.LoadAll(keys);
+
+ foreach (DictionaryEntry entry in result)
+ WriteObjects(cb, grid, entry.Key, entry.Value);
+
+ break;
+
+ case OpPut:
+ _store.Write(rawReader.ReadObject<object>(), rawReader.ReadObject<object>());
+
+ break;
+
+ case OpPutAll:
+ _store.WriteAll(rawReader.ReadDictionary());
+
+ break;
+
+ case OpRmv:
+ _store.Delete(rawReader.ReadObject<object>());
+
+ break;
+
+ case OpRmvAll:
+ _store.DeleteAll(rawReader.ReadCollection());
+
+ break;
+
+ case OpSesEnd:
+ grid.HandleRegistry.Release(sesId);
+
+ _store.SessionEnd(rawReader.ReadBoolean());
+
+ break;
+
+ default:
+ throw new IgniteException("Invalid operation type: " + opType);
+ }
+
+ return 0;
+ }
+ finally
+ {
+ _sesProxy.ClearSession();
+ }
+ }
+
+ /// <summary>
+ /// Writes objects to the marshaller.
+ /// </summary>
+ /// <param name="cb">Optional callback.</param>
+ /// <param name="grid">Grid.</param>
+ /// <param name="objects">Objects.</param>
+ private static void WriteObjects(IUnmanagedTarget cb, Ignite grid, params object[] objects)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().Stream())
+ {
+ PortableWriterImpl writer = grid.Marshaller.StartMarshal(stream);
+
+ try
+ {
+ foreach (var obj in objects)
+ {
+ writer.DetachNext();
+ writer.WriteObject(obj);
+ }
+ }
+ finally
+ {
+ grid.Marshaller.FinishMarshal(writer);
+ }
+
+ if (cb != null)
+ {
+ stream.SynchronizeOutput();
+
+ UnmanagedUtils.CacheStoreCallbackInvoke(cb, stream.MemoryPointer);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs
new file mode 100644
index 0000000..f771fe8
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Store
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cache.Store;
+
+ /// <summary>
+ /// Store session implementation.
+ /// </summary>
+ internal class CacheStoreSession : ICacheStoreSession
+ {
+ /** Properties. */
+ private IDictionary<object, object> _props;
+
+ /** <inheritdoc /> */
+
+ public string CacheName
+ {
+ get; internal set;
+ }
+
+ /** <inheritdoc /> */
+ public IDictionary<object, object> Properties
+ {
+ get { return _props ?? (_props = new Dictionary<object, object>(2)); }
+ }
+
+ /// <summary>
+ /// Clear session state.
+ /// </summary>
+ public void Clear()
+ {
+ if (_props != null)
+ _props.Clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs
new file mode 100644
index 0000000..3dd7354
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Store
+{
+ using System.Collections.Generic;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Store;
+
+ /// <summary>
+ /// Store session proxy.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+ internal class CacheStoreSessionProxy : ICacheStoreSession
+ {
+ /** Session. */
+ private readonly ThreadLocal<CacheStoreSession> _target = new ThreadLocal<CacheStoreSession>();
+
+ /** <inheritdoc /> */
+ public string CacheName
+ {
+ get { return _target.Value.CacheName; }
+ }
+
+ /** <inheritdoc /> */
+ public IDictionary<object, object> Properties
+ {
+ get { return _target.Value.Properties; }
+ }
+
+ /// <summary>
+ /// Set thread-bound session.
+ /// </summary>
+ /// <param name="ses">Session.</param>
+ internal void SetSession(CacheStoreSession ses)
+ {
+ _target.Value = ses;
+ }
+
+ /// <summary>
+ /// Clear thread-bound session.
+ /// </summary>
+ internal void ClearSession()
+ {
+ _target.Value = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
new file mode 100644
index 0000000..d26f52e
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using System.Threading;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute;
+ using Apache.Ignite.Core.Impl.Events;
+ using Apache.Ignite.Core.Impl.Messaging;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.Metadata;
+ using Apache.Ignite.Core.Impl.Services;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Messaging;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Services;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Ignite projection implementation.
+ /// </summary>
+ internal class ClusterGroupImpl : PlatformTarget, IClusterGroupEx
+ {
+ /** Attribute: platform. */
+ private const string AttrPlatform = "org.apache.ignite.platform";
+
+ /** Platform. */
+ private const string Platform = "dotnet";
+
+ /** Initial topver; invalid from Java perspective, so update will be triggered when this value is met. */
+ private const int TopVerInit = 0;
+
+ /** */
+ private const int OpAllMetadata = 1;
+
+ /** */
+ private const int OpForAttribute = 2;
+
+ /** */
+ private const int OpForCache = 3;
+
+ /** */
+ private const int OpForClient = 4;
+
+ /** */
+ private const int OpForData = 5;
+
+ /** */
+ private const int OpForHost = 6;
+
+ /** */
+ private const int OpForNodeIds = 7;
+
+ /** */
+ private const int OpMetadata = 8;
+
+ /** */
+ private const int OpMetrics = 9;
+
+ /** */
+ private const int OpMetricsFiltered = 10;
+
+ /** */
+ private const int OpNodeMetrics = 11;
+
+ /** */
+ private const int OpNodes = 12;
+
+ /** */
+ private const int OpPingNode = 13;
+
+ /** */
+ private const int OpTopology = 14;
+
+ /** Initial Ignite instance. */
+ private readonly Ignite _ignite;
+
+ /** Predicate. */
+ private readonly Func<IClusterNode, bool> _pred;
+
+ /** Topology version. */
+ [SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")]
+ private long _topVer = TopVerInit;
+
+ /** Nodes for the given topology version. */
+ private volatile IList<IClusterNode> _nodes;
+
+ /** Processor. */
+ private readonly IUnmanagedTarget _proc;
+
+ /** Compute. */
+ private readonly Lazy<Compute> _comp;
+
+ /** Messaging. */
+ private readonly Lazy<Messaging> _msg;
+
+ /** Events. */
+ private readonly Lazy<Events> _events;
+
+ /** Services. */
+ private readonly Lazy<IServices> _services;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="proc">Processor.</param>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="ignite">Grid.</param>
+ /// <param name="pred">Predicate.</param>
+ public ClusterGroupImpl(IUnmanagedTarget proc, IUnmanagedTarget target, PortableMarshaller marsh,
+ Ignite ignite, Func<IClusterNode, bool> pred)
+ : base(target, marsh)
+ {
+ _proc = proc;
+ _ignite = ignite;
+ _pred = pred;
+
+ _comp = new Lazy<Compute>(() =>
+ new Compute(new ComputeImpl(UU.ProcessorCompute(proc, target), marsh, this, false)));
+
+ _msg = new Lazy<Messaging>(() => new Messaging(UU.ProcessorMessage(proc, target), marsh, this));
+
+ _events = new Lazy<Events>(() => new Events(UU.ProcessorEvents(proc, target), marsh, this));
+
+ _services = new Lazy<IServices>(() =>
+ new Services(UU.ProcessorServices(proc, target), marsh, this, false, false));
+ }
+
+ /** <inheritDoc /> */
+ public IIgnite Ignite
+ {
+ get { return _ignite; }
+ }
+
+ /** <inheritDoc /> */
+ public ICompute Compute()
+ {
+ return _comp.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodes(IEnumerable<IClusterNode> nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes, "nodes");
+
+ return ForNodeIds0(nodes, node => node.Id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodes(params IClusterNode[] nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes, "nodes");
+
+ return ForNodeIds0(nodes, node => node.Id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodeIds(IEnumerable<Guid> ids)
+ {
+ IgniteArgumentCheck.NotNull(ids, "ids");
+
+ return ForNodeIds0(ids, null);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodeIds(params Guid[] ids)
+ {
+ IgniteArgumentCheck.NotNull(ids, "ids");
+
+ return ForNodeIds0(ids, null);
+ }
+
+ /// <summary>
+ /// Internal routine to get projection for specific node IDs.
+ /// </summary>
+ /// <param name="items">Items.</param>
+ /// <param name="func">Function to transform item to Guid (optional).</param>
+ /// <returns></returns>
+ private IClusterGroup ForNodeIds0<T>(IEnumerable<T> items, Func<T, Guid> func)
+ {
+ Debug.Assert(items != null);
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForNodeIds, writer =>
+ {
+ WriteEnumerable(writer, items, func);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForPredicate(Func<IClusterNode, bool> p)
+ {
+ var newPred = _pred == null ? p : node => _pred(node) && p(node);
+
+ return new ClusterGroupImpl(_proc, Target, Marshaller, _ignite, newPred);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForAttribute(string name, string val)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForAttribute, writer =>
+ {
+ writer.WriteString(name);
+ writer.WriteString(val);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /// <summary>
+ /// Creates projection with a specified op.
+ /// </summary>
+ /// <param name="name">Cache name to include into projection.</param>
+ /// <param name="op">Operation id.</param>
+ /// <returns>
+ /// Projection over nodes that have specified cache running.
+ /// </returns>
+ private IClusterGroup ForCacheNodes(string name, int op)
+ {
+ IUnmanagedTarget prj = DoProjetionOutOp(op, writer =>
+ {
+ writer.WriteString(name);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForCacheNodes(string name)
+ {
+ return ForCacheNodes(name, OpForCache);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForDataNodes(string name)
+ {
+ return ForCacheNodes(name, OpForData);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForClientNodes(string name)
+ {
+ return ForCacheNodes(name, OpForClient);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForRemotes()
+ {
+ return GetClusterGroup(UU.ProjectionForRemotes(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForHost(IClusterNode node)
+ {
+ IgniteArgumentCheck.NotNull(node, "node");
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForHost, writer =>
+ {
+ writer.WriteGuid(node.Id);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForRandom()
+ {
+ return GetClusterGroup(UU.ProjectionForRandom(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForOldest()
+ {
+ return GetClusterGroup(UU.ProjectionForOldest(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForYoungest()
+ {
+ return GetClusterGroup(UU.ProjectionForYoungest(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForDotNet()
+ {
+ return ForAttribute(AttrPlatform, Platform);
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<IClusterNode> Nodes()
+ {
+ return RefreshNodes();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterNode Node(Guid id)
+ {
+ return Nodes().FirstOrDefault(node => node.Id == id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterNode Node()
+ {
+ return Nodes().FirstOrDefault();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterMetrics Metrics()
+ {
+ if (_pred == null)
+ {
+ return DoInOp(OpMetrics, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
+ }
+ return DoOutInOp(OpMetricsFiltered, writer =>
+ {
+ WriteEnumerable(writer, Nodes().Select(node => node.Id));
+ }, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
+ }
+
+ /** <inheritDoc /> */
+ public IMessaging Message()
+ {
+ return _msg.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IEvents Events()
+ {
+ return _events.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IServices Services()
+ {
+ return _services.Value;
+ }
+
+ /// <summary>
+ /// Pings a remote node.
+ /// </summary>
+ /// <param name="nodeId">ID of a node to ping.</param>
+ /// <returns>True if node for a given ID is alive, false otherwise.</returns>
+ internal bool PingNode(Guid nodeId)
+ {
+ return DoOutOp(OpPingNode, nodeId) == True;
+ }
+
+ /// <summary>
+ /// Predicate (if any).
+ /// </summary>
+ public Func<IClusterNode, bool> Predicate
+ {
+ get { return _pred; }
+ }
+
+ /// <summary>
+ /// Refresh cluster node metrics.
+ /// </summary>
+ /// <param name="nodeId">Node</param>
+ /// <param name="lastUpdateTime"></param>
+ /// <returns></returns>
+ internal ClusterMetricsImpl RefreshClusterNodeMetrics(Guid nodeId, long lastUpdateTime)
+ {
+ return DoOutInOp(OpNodeMetrics, writer =>
+ {
+ writer.WriteGuid(nodeId);
+ writer.WriteLong(lastUpdateTime);
+ }, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ }
+ );
+ }
+
+ /// <summary>
+ /// Gets a topology by version. Returns null if topology history storage doesn't contain
+ /// specified topology version (history currently keeps the last 1000 snapshots).
+ /// </summary>
+ /// <param name="version">Topology version.</param>
+ /// <returns>Collection of Ignite nodes which represented by specified topology version,
+ /// if it is present in history storage, {@code null} otherwise.</returns>
+ /// <exception cref="IgniteException">If underlying SPI implementation does not support
+ /// topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+ /// supports topology history.</exception>
+ internal ICollection<IClusterNode> Topology(long version)
+ {
+ return DoOutInOp(OpTopology, writer => writer.WriteLong(version),
+ input => IgniteUtils.ReadNodes(Marshaller.StartUnmarshal(input)));
+ }
+
+ /// <summary>
+ /// Topology version.
+ /// </summary>
+ internal long TopologyVersion
+ {
+ get
+ {
+ RefreshNodes();
+
+ return Interlocked.Read(ref _topVer);
+ }
+ }
+
+ /// <summary>
+ /// Update topology.
+ /// </summary>
+ /// <param name="newTopVer">New topology version.</param>
+ /// <param name="newNodes">New nodes.</param>
+ internal void UpdateTopology(long newTopVer, List<IClusterNode> newNodes)
+ {
+ lock (this)
+ {
+ // If another thread already advanced topology version further, we still
+ // can safely return currently received nodes, but we will not assign them.
+ if (_topVer < newTopVer)
+ {
+ Interlocked.Exchange(ref _topVer, newTopVer);
+
+ _nodes = newNodes.AsReadOnly();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Get current nodes without refreshing the topology.
+ /// </summary>
+ /// <returns>Current nodes.</returns>
+ internal IList<IClusterNode> NodesNoRefresh()
+ {
+ return _nodes;
+ }
+
+ /// <summary>
+ /// Creates new Cluster Group from given native projection.
+ /// </summary>
+ /// <param name="prj">Native projection.</param>
+ /// <returns>New cluster group.</returns>
+ private IClusterGroup GetClusterGroup(IUnmanagedTarget prj)
+ {
+ return new ClusterGroupImpl(_proc, prj, Marshaller, _ignite, _pred);
+ }
+
+ /// <summary>
+ /// Refresh projection nodes.
+ /// </summary>
+ /// <returns>Nodes.</returns>
+ private IList<IClusterNode> RefreshNodes()
+ {
+ long oldTopVer = Interlocked.Read(ref _topVer);
+
+ List<IClusterNode> newNodes = null;
+
+ DoOutInOp(OpNodes, writer =>
+ {
+ writer.WriteLong(oldTopVer);
+ }, input =>
+ {
+ PortableReaderImpl reader = Marshaller.StartUnmarshal(input);
+
+ if (reader.ReadBoolean())
+ {
+ // Topology has been updated.
+ long newTopVer = reader.ReadLong();
+
+ newNodes = IgniteUtils.ReadNodes(reader, _pred);
+
+ UpdateTopology(newTopVer, newNodes);
+ }
+ });
+
+ if (newNodes != null)
+ return newNodes;
+
+ // No topology changes.
+ Debug.Assert(_nodes != null, "At least one topology update should have occurred.");
+
+ return _nodes;
+ }
+
+ /// <summary>
+ /// Perform synchronous out operation returning value.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action.</param>
+ /// <returns>Native projection.</returns>
+ private IUnmanagedTarget DoProjetionOutOp(int type, Action<PortableWriterImpl> action)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().Stream())
+ {
+ var writer = Marshaller.StartMarshal(stream);
+
+ action(writer);
+
+ FinishMarshal(writer);
+
+ return UU.ProjectionOutOpRet(Target, type, stream.SynchronizeOutput());
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IPortableMetadata Metadata(int typeId)
+ {
+ return DoOutInOp<IPortableMetadata>(OpMetadata,
+ writer =>
+ {
+ writer.WriteInt(typeId);
+ },
+ stream =>
+ {
+ PortableReaderImpl reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null;
+ }
+ );
+ }
+
+ /// <summary>
+ /// Gets metadata for all known types.
+ /// </summary>
+ public List<IPortableMetadata> Metadata()
+ {
+ return DoInOp(OpAllMetadata, s =>
+ {
+ var reader = Marshaller.StartUnmarshal(s);
+
+ var size = reader.ReadInt();
+
+ var res = new List<IPortableMetadata>(size);
+
+ for (var i = 0; i < size; i++)
+ res.Add(reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null);
+
+ return res;
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
new file mode 100644
index 0000000..664a1f1
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Cluster metrics implementation.
+ /// </summary>
+ internal class ClusterMetricsImpl : IClusterMetrics
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ClusterMetricsImpl"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ClusterMetricsImpl(IPortableRawReader reader)
+ {
+ LastUpdateTimeRaw = reader.ReadLong();
+
+ DateTime? lastUpdateTime0 = reader.ReadDate();
+
+ LastUpdateTime = lastUpdateTime0 ?? default(DateTime);
+ MaximumActiveJobs = reader.ReadInt();
+ CurrentActiveJobs = reader.ReadInt();
+ AverageActiveJobs = reader.ReadFloat();
+ MaximumWaitingJobs = reader.ReadInt();
+
+ CurrentWaitingJobs = reader.ReadInt();
+ AverageWaitingJobs = reader.ReadFloat();
+ MaximumRejectedJobs = reader.ReadInt();
+ CurrentRejectedJobs = reader.ReadInt();
+ AverageRejectedJobs = reader.ReadFloat();
+
+ TotalRejectedJobs = reader.ReadInt();
+ MaximumCancelledJobs = reader.ReadInt();
+ CurrentCancelledJobs = reader.ReadInt();
+ AverageCancelledJobs = reader.ReadFloat();
+ TotalCancelledJobs = reader.ReadInt();
+
+ TotalExecutedJobs = reader.ReadInt();
+ MaximumJobWaitTime = reader.ReadLong();
+ CurrentJobWaitTime = reader.ReadLong();
+ AverageJobWaitTime = reader.ReadDouble();
+ MaximumJobExecuteTime = reader.ReadLong();
+
+ CurrentJobExecuteTime = reader.ReadLong();
+ AverageJobExecuteTime = reader.ReadDouble();
+ TotalExecutedTasks = reader.ReadInt();
+ TotalIdleTime = reader.ReadLong();
+ CurrentIdleTime = reader.ReadLong();
+
+ TotalCpus = reader.ReadInt();
+ CurrentCpuLoad = reader.ReadDouble();
+ AverageCpuLoad = reader.ReadDouble();
+ CurrentGcCpuLoad = reader.ReadDouble();
+ HeapMemoryInitialized = reader.ReadLong();
+
+ HeapMemoryUsed = reader.ReadLong();
+ HeapMemoryCommitted = reader.ReadLong();
+ HeapMemoryMaximum = reader.ReadLong();
+ HeapMemoryTotal = reader.ReadLong();
+ NonHeapMemoryInitialized = reader.ReadLong();
+
+ NonHeapMemoryUsed = reader.ReadLong();
+ NonHeapMemoryCommitted = reader.ReadLong();
+ NonHeapMemoryMaximum = reader.ReadLong();
+ NonHeapMemoryTotal = reader.ReadLong();
+ UpTime = reader.ReadLong();
+
+ DateTime? startTime0 = reader.ReadDate();
+
+ StartTime = startTime0 ?? default(DateTime);
+
+ DateTime? nodeStartTime0 = reader.ReadDate();
+
+ NodeStartTime = nodeStartTime0 ?? default(DateTime);
+
+ CurrentThreadCount = reader.ReadInt();
+ MaximumThreadCount = reader.ReadInt();
+ TotalStartedThreadCount = reader.ReadLong();
+ CurrentDaemonThreadCount = reader.ReadInt();
+ LastDataVersion = reader.ReadLong();
+
+ SentMessagesCount = reader.ReadInt();
+ SentBytesCount = reader.ReadLong();
+ ReceivedMessagesCount = reader.ReadInt();
+ ReceivedBytesCount = reader.ReadLong();
+ OutboundMessagesQueueSize = reader.ReadInt();
+
+ TotalNodes = reader.ReadInt();
+ }
+
+ /// <summary>
+ /// Last update time in raw format.
+ /// </summary>
+ internal long LastUpdateTimeRaw { get; set; }
+
+ /** <inheritDoc /> */
+ public DateTime LastUpdateTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalExecutedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public long MaximumJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long MaximumJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalExecutedTasks { get; private set; }
+
+ /** <inheritDoc /> */
+ public long TotalBusyTime
+ {
+ get { return UpTime - TotalIdleTime; }
+ }
+
+ /** <inheritDoc /> */
+ public long TotalIdleTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentIdleTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public float BusyTimePercentage
+ {
+ get { return 1 - IdleTimePercentage; }
+ }
+
+ /** <inheritDoc /> */
+ public float IdleTimePercentage
+ {
+ get { return TotalIdleTime / (float) UpTime; }
+ }
+
+ /** <inheritDoc /> */
+ public int TotalCpus { get; private set; }
+
+ /** <inheritDoc /> */
+ public double CurrentCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public double CurrentGcCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryInitialized { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryUsed { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryCommitted { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryMaximum { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryTotal { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryInitialized { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryUsed { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryCommitted { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryMaximum { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryTotal { get; private set; }
+
+ /** <inheritDoc /> */
+ public long UpTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public DateTime StartTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public DateTime NodeStartTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long TotalStartedThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentDaemonThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long LastDataVersion { get; private set; }
+
+ /** <inheritDoc /> */
+ public int SentMessagesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long SentBytesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int ReceivedMessagesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long ReceivedBytesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int OutboundMessagesQueueSize { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalNodes { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
new file mode 100644
index 0000000..59373a2
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Impl.Collections;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Cluster node implementation.
+ /// </summary>
+ internal class ClusterNodeImpl : IClusterNode
+ {
+ /** Node ID. */
+ private readonly Guid _id;
+
+ /** Attributes. */
+ private readonly IDictionary<string, object> _attrs;
+
+ /** Addresses. */
+ private readonly ICollection<string> _addrs;
+
+ /** Hosts. */
+ private readonly ICollection<string> _hosts;
+
+ /** Order. */
+ private readonly long _order;
+
+ /** Local flag. */
+ private readonly bool _local;
+
+ /** Daemon flag. */
+ private readonly bool _daemon;
+
+ /** Metrics. */
+ private volatile ClusterMetricsImpl _metrics;
+
+ /** Ignite reference. */
+ private WeakReference _igniteRef;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ClusterNodeImpl"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ClusterNodeImpl(IPortableRawReader reader)
+ {
+ _id = reader.ReadGuid() ?? default(Guid);
+
+ _attrs = reader.ReadGenericDictionary<string, object>().AsReadOnly();
+ _addrs = reader.ReadGenericCollection<string>().AsReadOnly();
+ _hosts = reader.ReadGenericCollection<string>().AsReadOnly();
+ _order = reader.ReadLong();
+ _local = reader.ReadBoolean();
+ _daemon = reader.ReadBoolean();
+
+ _metrics = reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ }
+
+ /** <inheritDoc /> */
+ public Guid Id
+ {
+ get { return _id; }
+ }
+
+ /** <inheritDoc /> */
+ public T Attribute<T>(string name)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ return (T)_attrs[name];
+ }
+
+ /** <inheritDoc /> */
+ public bool TryGetAttribute<T>(string name, out T attr)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ object val;
+
+ if (_attrs.TryGetValue(name, out val))
+ {
+ attr = (T)val;
+
+ return true;
+ }
+ attr = default(T);
+
+ return false;
+ }
+
+ /** <inheritDoc /> */
+ public IDictionary<string, object> Attributes()
+ {
+ return _attrs;
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<string> Addresses
+ {
+ get
+ {
+ return _addrs;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<string> HostNames
+ {
+ get
+ {
+ return _hosts;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public long Order
+ {
+ get
+ {
+ return _order;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool IsLocal
+ {
+ get
+ {
+ return _local;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool IsDaemon
+ {
+ get
+ {
+ return _daemon;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IClusterMetrics Metrics()
+ {
+ var ignite = (Ignite)_igniteRef.Target;
+
+ if (ignite == null)
+ return _metrics;
+
+ ClusterMetricsImpl oldMetrics = _metrics;
+
+ long lastUpdateTime = oldMetrics.LastUpdateTimeRaw;
+
+ ClusterMetricsImpl newMetrics = ignite.ClusterGroup.RefreshClusterNodeMetrics(_id, lastUpdateTime);
+
+ if (newMetrics != null)
+ {
+ lock (this)
+ {
+ if (_metrics.LastUpdateTime < newMetrics.LastUpdateTime)
+ _metrics = newMetrics;
+ }
+
+ return newMetrics;
+ }
+
+ return oldMetrics;
+ }
+
+ /** <inheritDoc /> */
+ public override string ToString()
+ {
+ return "GridNode [id=" + Id + ']';
+ }
+
+ /** <inheritDoc /> */
+ public override bool Equals(object obj)
+ {
+ ClusterNodeImpl node = obj as ClusterNodeImpl;
+
+ if (node != null)
+ return _id.Equals(node._id);
+
+ return false;
+ }
+
+ /** <inheritDoc /> */
+ public override int GetHashCode()
+ {
+ // ReSharper disable once NonReadonlyMemberInGetHashCode
+ return _id.GetHashCode();
+ }
+
+ /// <summary>
+ /// Initializes this instance with a grid.
+ /// </summary>
+ /// <param name="grid">The grid.</param>
+ internal void Init(Ignite grid)
+ {
+ _igniteRef = new WeakReference(grid);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
new file mode 100644
index 0000000..554eb0a
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ ///
+ /// </summary>
+ internal interface IClusterGroupEx : IClusterGroup
+ {
+ /// <summary>
+ /// Gets protable metadata for type.
+ /// </summary>
+ /// <param name="typeId">Type ID.</param>
+ /// <returns>Metadata.</returns>
+ IPortableMetadata Metadata(int typeId);
+ }
+}